001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.util;
018    
019    import java.io.IOException;
020    import java.sql.SQLException;
021    import java.util.concurrent.TimeUnit;
022    import java.util.concurrent.atomic.AtomicBoolean;
023    
024    import org.apache.activemq.broker.BrokerService;
025    import org.slf4j.Logger;
026    import org.slf4j.LoggerFactory;
027    
028    /**
029     * @org.apache.xbean.XBean
030     */
031     public class DefaultIOExceptionHandler implements IOExceptionHandler {
032    
033        private static final Logger LOG = LoggerFactory
034                .getLogger(DefaultIOExceptionHandler.class);
035        private BrokerService broker;
036        private boolean ignoreAllErrors = false;
037        private boolean ignoreNoSpaceErrors = true;
038        private boolean ignoreSQLExceptions = true;
039        private boolean stopStartConnectors = false;
040        private String noSpaceMessage = "space";
041        private String sqlExceptionMessage = ""; // match all
042        private long resumeCheckSleepPeriod = 5*1000;
043        private AtomicBoolean stopStartInProgress = new AtomicBoolean(false);
044    
045        public void handle(IOException exception) {
046            if (ignoreAllErrors) {
047                LOG.info("Ignoring IO exception, " + exception, exception);
048                return;
049            }
050    
051            if (ignoreNoSpaceErrors) {
052                Throwable cause = exception;
053                while (cause != null && cause instanceof IOException) {
054                    String message = cause.getMessage();
055                    if (message != null && message.contains(noSpaceMessage)) {
056                        LOG.info("Ignoring no space left exception, " + exception, exception);
057                        return;
058                    }
059                    cause = cause.getCause();
060                }
061            }
062    
063            if (ignoreSQLExceptions) {
064                Throwable cause = exception;
065                while (cause != null) {
066                    String message = cause.getMessage();
067                    if (cause instanceof SQLException && message.contains(sqlExceptionMessage)) {
068                        LOG.info("Ignoring SQLException, " + exception, cause);
069                        return;
070                    }
071                    cause = cause.getCause();
072                }
073            }
074    
075            if (stopStartConnectors) {
076                if (!stopStartInProgress.compareAndSet(false, true)) {
077                    // we are already working on it
078                    return;
079                }
080                LOG.info("Initiating stop/restart of broker transport due to IO exception, " + exception, exception);
081    
082                new Thread("stop transport connectors on IO exception") {
083                    public void run() {
084                        try {
085                            ServiceStopper stopper = new ServiceStopper();
086                            broker.stopAllConnectors(stopper);
087                        } catch (Exception e) {
088                            LOG.warn("Failure occurred while stopping broker connectors", e);
089                        }
090                    }
091                }.start();
092    
093                // resume again
094                new Thread("restart transport connectors post IO exception") {
095                    public void run() {
096                        try {
097                            while (isPersistenceAdapterDown()) {
098                                LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
099                                TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
100                            }
101                            broker.startAllConnectors();
102                        } catch (Exception e) {
103                            LOG.warn("Failure occurred while restarting broker connectors", e);
104                        } finally {
105                            stopStartInProgress.compareAndSet(true, false);
106                        }
107                    }
108    
109                    private boolean isPersistenceAdapterDown() {
110                        boolean checkpointSuccess = false;
111                        try {
112                            broker.getPersistenceAdapter().checkpoint(true);
113                            checkpointSuccess = true;
114                        } catch (Throwable ignored) {}
115                        return !checkpointSuccess;
116                    }
117                }.start();
118    
119                return;
120            }
121    
122            LOG.info("Stopping the broker due to IO exception, " + exception, exception);
123            new Thread("Stopping the broker due to IO exception") {
124                public void run() {
125                    try {
126                        broker.stop();
127                    } catch (Exception e) {
128                        LOG.warn("Failure occurred while stopping broker", e);
129                    }
130                }
131            }.start();
132        }
133    
134        public void setBrokerService(BrokerService broker) {
135            this.broker = broker;
136        }
137    
138        public boolean isIgnoreAllErrors() {
139            return ignoreAllErrors;
140        }
141    
142        public void setIgnoreAllErrors(boolean ignoreAllErrors) {
143            this.ignoreAllErrors = ignoreAllErrors;
144        }
145    
146        public boolean isIgnoreNoSpaceErrors() {
147            return ignoreNoSpaceErrors;
148        }
149    
150        public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) {
151            this.ignoreNoSpaceErrors = ignoreNoSpaceErrors;
152        }
153    
154        public String getNoSpaceMessage() {
155            return noSpaceMessage;
156        }
157    
158        public void setNoSpaceMessage(String noSpaceMessage) {
159            this.noSpaceMessage = noSpaceMessage;
160        }
161    
162        public boolean isIgnoreSQLExceptions() {
163            return ignoreSQLExceptions;
164        }
165    
166        public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) {
167            this.ignoreSQLExceptions = ignoreSQLExceptions;
168        }
169    
170        public String getSqlExceptionMessage() {
171            return sqlExceptionMessage;
172        }
173    
174        public void setSqlExceptionMessage(String sqlExceptionMessage) {
175            this.sqlExceptionMessage = sqlExceptionMessage;
176        }
177    
178        public boolean isStopStartConnectors() {
179            return stopStartConnectors;
180        }
181    
182        public void setStopStartConnectors(boolean stopStartConnectors) {
183            this.stopStartConnectors = stopStartConnectors;
184        }
185    
186        public long getResumeCheckSleepPeriod() {
187            return resumeCheckSleepPeriod;
188        }
189    
190        public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) {
191            this.resumeCheckSleepPeriod = resumeCheckSleepPeriod;
192        }
193    }