001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc;
018
019 import java.io.IOException;
020 import java.sql.Connection;
021 import java.sql.PreparedStatement;
022 import java.sql.SQLException;
023
024 import javax.sql.DataSource;
025
026 import org.apache.activemq.util.Handler;
027 import org.slf4j.Logger;
028 import org.slf4j.LoggerFactory;
029
030 /**
031 * Represents an exclusive lock on a database to avoid multiple brokers running
032 * against the same logical database.
033 *
034 * @org.apache.xbean.XBean element="database-locker"
035 *
036 */
037 public class DefaultDatabaseLocker implements DatabaseLocker {
038 public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 1000;
039 private static final Logger LOG = LoggerFactory.getLogger(DefaultDatabaseLocker.class);
040 protected DataSource dataSource;
041 protected Statements statements;
042 protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
043
044 protected Connection connection;
045 protected boolean stopping;
046 protected Handler<Exception> exceptionHandler;
047
048 public DefaultDatabaseLocker() {
049 }
050
051 public DefaultDatabaseLocker(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
052 setPersistenceAdapter(persistenceAdapter);
053 }
054
055 public void setPersistenceAdapter(JDBCPersistenceAdapter adapter) throws IOException {
056 this.dataSource = adapter.getLockDataSource();
057 this.statements = adapter.getStatements();
058 }
059
060 public void start() throws Exception {
061 stopping = false;
062
063 LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
064 String sql = statements.getLockCreateStatement();
065 LOG.debug("Locking Query is "+sql);
066
067 PreparedStatement statement = null;
068 while (true) {
069 try {
070 connection = dataSource.getConnection();
071 connection.setAutoCommit(false);
072 statement = connection.prepareStatement(sql);
073 statement.execute();
074 break;
075 } catch (Exception e) {
076 try {
077 if (stopping) {
078 throw new Exception(
079 "Cannot start broker as being asked to shut down. "
080 + "Interrupted attempt to acquire lock: "
081 + e, e);
082 }
083 if (exceptionHandler != null) {
084 try {
085 exceptionHandler.handle(e);
086 } catch (Throwable handlerException) {
087 LOG.error( "The exception handler "
088 + exceptionHandler.getClass().getCanonicalName()
089 + " threw this exception: "
090 + handlerException
091 + " while trying to handle this exception: "
092 + e, handlerException);
093 }
094
095 } else {
096 LOG.debug("Lock failure: "+ e, e);
097 }
098 } finally {
099 // Let's make sure the database connection is properly
100 // closed when an error occurs so that we're not leaking
101 // connections
102 if (null != connection) {
103 try {
104 connection.close();
105 } catch (SQLException e1) {
106 LOG.error("Caught exception while closing connection: " + e1, e1);
107 }
108
109 connection = null;
110 }
111 }
112 } finally {
113 if (null != statement) {
114 try {
115 statement.close();
116 } catch (SQLException e1) {
117 LOG.debug("Caught while closing statement: " + e1, e1);
118 }
119 statement = null;
120 }
121 }
122
123 LOG.info("Failed to acquire lock. Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again...");
124 try {
125 Thread.sleep(lockAcquireSleepInterval);
126 } catch (InterruptedException ie) {
127 LOG.warn("Master lock retry sleep interrupted", ie);
128 }
129 }
130
131 LOG.info("Becoming the master on dataSource: " + dataSource);
132 }
133
134 public void stop() throws Exception {
135 stopping = true;
136 try {
137 if (connection != null && !connection.isClosed()) {
138 try {
139 connection.rollback();
140 } catch (SQLException sqle) {
141 LOG.warn("Exception while rollbacking the connection on shutdown", sqle);
142 } finally {
143 try {
144 connection.close();
145 } catch (SQLException ignored) {
146 LOG.debug("Exception while closing connection on shutdown", ignored);
147 }
148 }
149 }
150 } catch (SQLException sqle) {
151 LOG.warn("Exception while checking close status of connection on shutdown", sqle);
152 }
153 }
154
155 public boolean keepAlive() {
156 PreparedStatement statement = null;
157 boolean result = false;
158 try {
159 statement = connection.prepareStatement(statements.getLockUpdateStatement());
160 statement.setLong(1, System.currentTimeMillis());
161 int rows = statement.executeUpdate();
162 if (rows == 1) {
163 result=true;
164 }
165 } catch (Exception e) {
166 LOG.error("Failed to update database lock: " + e, e);
167 } finally {
168 if (statement != null) {
169 try {
170 statement.close();
171 } catch (SQLException e) {
172 LOG.error("Failed to close statement",e);
173 }
174 }
175 }
176 return result;
177 }
178
179 public long getLockAcquireSleepInterval() {
180 return lockAcquireSleepInterval;
181 }
182
183 public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
184 this.lockAcquireSleepInterval = lockAcquireSleepInterval;
185 }
186
187 public Handler getExceptionHandler() {
188 return exceptionHandler;
189 }
190
191 public void setExceptionHandler(Handler exceptionHandler) {
192 this.exceptionHandler = exceptionHandler;
193 }
194
195 }