001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.util;
018
019 import java.io.IOException;
020 import java.sql.SQLException;
021 import java.util.concurrent.TimeUnit;
022 import java.util.concurrent.atomic.AtomicBoolean;
023
024 import org.apache.activemq.broker.BrokerService;
025 import org.slf4j.Logger;
026 import org.slf4j.LoggerFactory;
027
028 /**
029 * @org.apache.xbean.XBean
030 */
031 public class DefaultIOExceptionHandler implements IOExceptionHandler {
032
033 private static final Logger LOG = LoggerFactory
034 .getLogger(DefaultIOExceptionHandler.class);
035 private BrokerService broker;
036 private boolean ignoreAllErrors = false;
037 private boolean ignoreNoSpaceErrors = true;
038 private boolean ignoreSQLExceptions = true;
039 private boolean stopStartConnectors = false;
040 private String noSpaceMessage = "space";
041 private String sqlExceptionMessage = ""; // match all
042 private long resumeCheckSleepPeriod = 5*1000;
043 private AtomicBoolean stopStartInProgress = new AtomicBoolean(false);
044
045 public void handle(IOException exception) {
046 if (ignoreAllErrors) {
047 LOG.info("Ignoring IO exception, " + exception, exception);
048 return;
049 }
050
051 if (ignoreNoSpaceErrors) {
052 Throwable cause = exception;
053 while (cause != null && cause instanceof IOException) {
054 if (cause.getMessage().contains(noSpaceMessage)) {
055 LOG.info("Ignoring no space left exception, " + exception, exception);
056 return;
057 }
058 cause = cause.getCause();
059 }
060 }
061
062 if (ignoreSQLExceptions) {
063 Throwable cause = exception;
064 while (cause != null) {
065 if (cause instanceof SQLException && cause.getMessage().contains(sqlExceptionMessage)) {
066 LOG.info("Ignoring SQLException, " + exception, cause);
067 return;
068 }
069 cause = cause.getCause();
070 }
071 }
072
073 if (stopStartConnectors) {
074 if (!stopStartInProgress.compareAndSet(false, true)) {
075 // we are already working on it
076 return;
077 }
078 LOG.info("Initiating stop/restart of broker transport due to IO exception, " + exception, exception);
079
080 new Thread("stop transport connectors on IO exception") {
081 public void run() {
082 try {
083 ServiceStopper stopper = new ServiceStopper();
084 broker.stopAllConnectors(stopper);
085 } catch (Exception e) {
086 LOG.warn("Failure occurred while stopping broker connectors", e);
087 }
088 }
089 }.start();
090
091 // resume again
092 new Thread("restart transport connectors post IO exception") {
093 public void run() {
094 try {
095 while (isPersistenceAdapterDown()) {
096 LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
097 TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
098 }
099 broker.startAllConnectors();
100 } catch (Exception e) {
101 LOG.warn("Failure occurred while restarting broker connectors", e);
102 } finally {
103 stopStartInProgress.compareAndSet(true, false);
104 }
105 }
106
107 private boolean isPersistenceAdapterDown() {
108 boolean checkpointSuccess = false;
109 try {
110 broker.getPersistenceAdapter().checkpoint(true);
111 checkpointSuccess = true;
112 } catch (Throwable ignored) {}
113 return !checkpointSuccess;
114 }
115 }.start();
116
117 return;
118 }
119
120 LOG.info("Stopping the broker due to IO exception, " + exception, exception);
121 new Thread("Stopping the broker due to IO exception") {
122 public void run() {
123 try {
124 broker.stop();
125 } catch (Exception e) {
126 LOG.warn("Failure occurred while stopping broker", e);
127 }
128 }
129 }.start();
130 }
131
132 public void setBrokerService(BrokerService broker) {
133 this.broker = broker;
134 }
135
136 public boolean isIgnoreAllErrors() {
137 return ignoreAllErrors;
138 }
139
140 public void setIgnoreAllErrors(boolean ignoreAllErrors) {
141 this.ignoreAllErrors = ignoreAllErrors;
142 }
143
144 public boolean isIgnoreNoSpaceErrors() {
145 return ignoreNoSpaceErrors;
146 }
147
148 public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) {
149 this.ignoreNoSpaceErrors = ignoreNoSpaceErrors;
150 }
151
152 public String getNoSpaceMessage() {
153 return noSpaceMessage;
154 }
155
156 public void setNoSpaceMessage(String noSpaceMessage) {
157 this.noSpaceMessage = noSpaceMessage;
158 }
159
160 public boolean isIgnoreSQLExceptions() {
161 return ignoreSQLExceptions;
162 }
163
164 public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) {
165 this.ignoreSQLExceptions = ignoreSQLExceptions;
166 }
167
168 public String getSqlExceptionMessage() {
169 return sqlExceptionMessage;
170 }
171
172 public void setSqlExceptionMessage(String sqlExceptionMessage) {
173 this.sqlExceptionMessage = sqlExceptionMessage;
174 }
175
176 public boolean isStopStartConnectors() {
177 return stopStartConnectors;
178 }
179
180 public void setStopStartConnectors(boolean stopStartConnectors) {
181 this.stopStartConnectors = stopStartConnectors;
182 }
183
184 public long getResumeCheckSleepPeriod() {
185 return resumeCheckSleepPeriod;
186 }
187
188 public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) {
189 this.resumeCheckSleepPeriod = resumeCheckSleepPeriod;
190 }
191 }