001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.sql.Connection;
022 import java.sql.SQLException;
023 import java.util.Collections;
024 import java.util.Set;
025 import java.util.concurrent.ScheduledFuture;
026 import java.util.concurrent.ScheduledThreadPoolExecutor;
027 import java.util.concurrent.ThreadFactory;
028 import java.util.concurrent.TimeUnit;
029
030 import javax.sql.DataSource;
031
032 import org.apache.activemq.ActiveMQMessageAudit;
033 import org.apache.activemq.broker.BrokerService;
034 import org.apache.activemq.broker.BrokerServiceAware;
035 import org.apache.activemq.broker.ConnectionContext;
036 import org.apache.activemq.command.ActiveMQDestination;
037 import org.apache.activemq.command.ActiveMQQueue;
038 import org.apache.activemq.command.ActiveMQTopic;
039 import org.apache.activemq.command.Message;
040 import org.apache.activemq.command.MessageId;
041 import org.apache.activemq.command.ProducerId;
042 import org.apache.activemq.openwire.OpenWireFormat;
043 import org.apache.activemq.store.MessageStore;
044 import org.apache.activemq.store.PersistenceAdapter;
045 import org.apache.activemq.store.TopicMessageStore;
046 import org.apache.activemq.store.TransactionStore;
047 import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
048 import org.apache.activemq.store.memory.MemoryTransactionStore;
049 import org.apache.activemq.usage.SystemUsage;
050 import org.apache.activemq.util.ByteSequence;
051 import org.apache.activemq.util.FactoryFinder;
052 import org.apache.activemq.util.IOExceptionSupport;
053 import org.apache.activemq.util.LongSequenceGenerator;
054 import org.apache.activemq.wireformat.WireFormat;
055 import org.slf4j.Logger;
056 import org.slf4j.LoggerFactory;
057
058 /**
059 * A {@link PersistenceAdapter} implementation using JDBC for persistence
060 * storage.
061 *
062 * This persistence adapter will correctly remember prepared XA transactions,
063 * but it will not keep track of local transaction commits so that operations
064 * performed against the Message store are done as a single uow.
065 *
066 * @org.apache.xbean.XBean element="jdbcPersistenceAdapter"
067 *
068 *
069 */
070 public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter,
071 BrokerServiceAware {
072
073 private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class);
074 private static FactoryFinder adapterFactoryFinder = new FactoryFinder(
075 "META-INF/services/org/apache/activemq/store/jdbc/");
076 private static FactoryFinder lockFactoryFinder = new FactoryFinder(
077 "META-INF/services/org/apache/activemq/store/jdbc/lock/");
078
079 private WireFormat wireFormat = new OpenWireFormat();
080 private BrokerService brokerService;
081 private Statements statements;
082 private JDBCAdapter adapter;
083 private MemoryTransactionStore transactionStore;
084 private ScheduledThreadPoolExecutor clockDaemon;
085 private ScheduledFuture<?> cleanupTicket, keepAliveTicket;
086 private int cleanupPeriod = 1000 * 60 * 5;
087 private boolean useExternalMessageReferences;
088 private boolean useDatabaseLock = true;
089 private long lockKeepAlivePeriod = 1000*30;
090 private long lockAcquireSleepInterval = DefaultDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
091 private DatabaseLocker databaseLocker;
092 private boolean createTablesOnStartup = true;
093 private DataSource lockDataSource;
094 private int transactionIsolation;
095
096 protected int maxProducersToAudit=1024;
097 protected int maxAuditDepth=1000;
098 protected boolean enableAudit=false;
099 protected int auditRecoveryDepth = 1024;
100 protected ActiveMQMessageAudit audit;
101
102 protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
103 protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
104
105 public JDBCPersistenceAdapter() {
106 }
107
108 public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
109 super(ds);
110 this.wireFormat = wireFormat;
111 }
112
113 public Set<ActiveMQDestination> getDestinations() {
114 // Get a connection and insert the message into the DB.
115 TransactionContext c = null;
116 try {
117 c = getTransactionContext();
118 return getAdapter().doGetDestinations(c);
119 } catch (IOException e) {
120 return emptyDestinationSet();
121 } catch (SQLException e) {
122 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
123 return emptyDestinationSet();
124 } finally {
125 if (c != null) {
126 try {
127 c.close();
128 } catch (Throwable e) {
129 }
130 }
131 }
132 }
133
134 @SuppressWarnings("unchecked")
135 private Set<ActiveMQDestination> emptyDestinationSet() {
136 return Collections.EMPTY_SET;
137 }
138
139 protected void createMessageAudit() {
140 if (enableAudit && audit == null) {
141 audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
142 TransactionContext c = null;
143
144 try {
145 c = getTransactionContext();
146 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
147 public void messageId(MessageId id) {
148 audit.isDuplicate(id);
149 }
150 });
151 } catch (Exception e) {
152 LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
153 } finally {
154 if (c != null) {
155 try {
156 c.close();
157 } catch (Throwable e) {
158 }
159 }
160 }
161 }
162 }
163
164 public void initSequenceIdGenerator() {
165 TransactionContext c = null;
166 try {
167 c = getTransactionContext();
168 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
169 public void messageId(MessageId id) {
170 audit.isDuplicate(id);
171 }
172 });
173 } catch (Exception e) {
174 LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
175 } finally {
176 if (c != null) {
177 try {
178 c.close();
179 } catch (Throwable e) {
180 }
181 }
182 }
183
184 }
185
186 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
187 MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
188 if (transactionStore != null) {
189 rc = transactionStore.proxy(rc);
190 }
191 return rc;
192 }
193
194 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
195 TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
196 if (transactionStore != null) {
197 rc = transactionStore.proxy(rc);
198 }
199 return rc;
200 }
201
202 /**
203 * Cleanup method to remove any state associated with the given destination
204 * No state retained.... nothing to do
205 *
206 * @param destination Destination to forget
207 */
208 public void removeQueueMessageStore(ActiveMQQueue destination) {
209 }
210
211 /**
212 * Cleanup method to remove any state associated with the given destination
213 * No state retained.... nothing to do
214 *
215 * @param destination Destination to forget
216 */
217 public void removeTopicMessageStore(ActiveMQTopic destination) {
218 }
219
220 public TransactionStore createTransactionStore() throws IOException {
221 if (transactionStore == null) {
222 transactionStore = new MemoryTransactionStore(this);
223 }
224 return this.transactionStore;
225 }
226
227 public long getLastMessageBrokerSequenceId() throws IOException {
228 TransactionContext c = getTransactionContext();
229 try {
230 long seq = getAdapter().doGetLastMessageStoreSequenceId(c);
231 sequenceGenerator.setLastSequenceId(seq);
232 long brokerSeq = 0;
233 if (seq != 0) {
234 byte[] msg = getAdapter().doGetMessageById(c, seq);
235 if (msg != null) {
236 Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg));
237 brokerSeq = last.getMessageId().getBrokerSequenceId();
238 } else {
239 LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!");
240 }
241 }
242 return brokerSeq;
243 } catch (SQLException e) {
244 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
245 throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
246 } finally {
247 c.close();
248 }
249 }
250
251 public long getLastProducerSequenceId(ProducerId id) throws IOException {
252 TransactionContext c = getTransactionContext();
253 try {
254 return getAdapter().doGetLastProducerSequenceId(c, id);
255 } catch (SQLException e) {
256 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
257 throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
258 } finally {
259 c.close();
260 }
261 }
262
263
264 public void start() throws Exception {
265 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
266
267 if (isCreateTablesOnStartup()) {
268 TransactionContext transactionContext = getTransactionContext();
269 transactionContext.begin();
270 try {
271 try {
272 getAdapter().doCreateTables(transactionContext);
273 } catch (SQLException e) {
274 LOG.warn("Cannot create tables due to: " + e);
275 JDBCPersistenceAdapter.log("Failure Details: ", e);
276 }
277 } finally {
278 transactionContext.commit();
279 }
280 }
281
282 if (isUseDatabaseLock()) {
283 DatabaseLocker service = getDatabaseLocker();
284 if (service == null) {
285 LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter");
286 } else {
287 service.start();
288 if (lockKeepAlivePeriod > 0) {
289 keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
290 public void run() {
291 databaseLockKeepAlive();
292 }
293 }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
294 }
295 if (brokerService != null) {
296 brokerService.getBroker().nowMasterBroker();
297 }
298 }
299 }
300
301 cleanup();
302
303 // Cleanup the db periodically.
304 if (cleanupPeriod > 0) {
305 cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
306 public void run() {
307 cleanup();
308 }
309 }, cleanupPeriod, cleanupPeriod, TimeUnit.MILLISECONDS);
310 }
311
312 createMessageAudit();
313 }
314
315 public synchronized void stop() throws Exception {
316 if (cleanupTicket != null) {
317 cleanupTicket.cancel(true);
318 cleanupTicket = null;
319 }
320 if (keepAliveTicket != null) {
321 keepAliveTicket.cancel(false);
322 keepAliveTicket = null;
323 }
324
325 // do not shutdown clockDaemon as it may kill the thread initiating shutdown
326 DatabaseLocker service = getDatabaseLocker();
327 if (service != null) {
328 service.stop();
329 }
330 }
331
332 public void cleanup() {
333 TransactionContext c = null;
334 try {
335 LOG.debug("Cleaning up old messages.");
336 c = getTransactionContext();
337 getAdapter().doDeleteOldMessages(c, false);
338 getAdapter().doDeleteOldMessages(c, true);
339 } catch (IOException e) {
340 LOG.warn("Old message cleanup failed due to: " + e, e);
341 } catch (SQLException e) {
342 LOG.warn("Old message cleanup failed due to: " + e);
343 JDBCPersistenceAdapter.log("Failure Details: ", e);
344 } finally {
345 if (c != null) {
346 try {
347 c.close();
348 } catch (Throwable e) {
349 }
350 }
351 LOG.debug("Cleanup done.");
352 }
353 }
354
355 public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon) {
356 this.clockDaemon = clockDaemon;
357 }
358
359 public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
360 if (clockDaemon == null) {
361 clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
362 public Thread newThread(Runnable runnable) {
363 Thread thread = new Thread(runnable, "ActiveMQ Cleanup Timer");
364 thread.setDaemon(true);
365 return thread;
366 }
367 });
368 }
369 return clockDaemon;
370 }
371
372 public JDBCAdapter getAdapter() throws IOException {
373 if (adapter == null) {
374 setAdapter(createAdapter());
375 }
376 return adapter;
377 }
378
379 public DatabaseLocker getDatabaseLocker() throws IOException {
380 if (databaseLocker == null && isUseDatabaseLock()) {
381 setDatabaseLocker(loadDataBaseLocker());
382 }
383 return databaseLocker;
384 }
385
386 /**
387 * Sets the database locker strategy to use to lock the database on startup
388 * @throws IOException
389 */
390 public void setDatabaseLocker(DatabaseLocker locker) throws IOException {
391 databaseLocker = locker;
392 databaseLocker.setPersistenceAdapter(this);
393 databaseLocker.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
394 }
395
396 public DataSource getLockDataSource() throws IOException {
397 if (lockDataSource == null) {
398 lockDataSource = getDataSource();
399 if (lockDataSource == null) {
400 throw new IllegalArgumentException(
401 "No dataSource property has been configured");
402 }
403 } else {
404 LOG.info("Using a separate dataSource for locking: "
405 + lockDataSource);
406 }
407 return lockDataSource;
408 }
409
410 public void setLockDataSource(DataSource dataSource) {
411 this.lockDataSource = dataSource;
412 }
413
414 public BrokerService getBrokerService() {
415 return brokerService;
416 }
417
418 public void setBrokerService(BrokerService brokerService) {
419 this.brokerService = brokerService;
420 }
421
422 /**
423 * @throws IOException
424 */
425 protected JDBCAdapter createAdapter() throws IOException {
426
427 adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter");
428
429 // Use the default JDBC adapter if the
430 // Database type is not recognized.
431 if (adapter == null) {
432 adapter = new DefaultJDBCAdapter();
433 LOG.debug("Using default JDBC Adapter: " + adapter);
434 }
435 return adapter;
436 }
437
438 private Object loadAdapter(FactoryFinder finder, String kind) throws IOException {
439 Object adapter = null;
440 TransactionContext c = getTransactionContext();
441 try {
442 try {
443 // Make the filename file system safe.
444 String dirverName = c.getConnection().getMetaData().getDriverName();
445 dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase();
446
447 try {
448 adapter = finder.newInstance(dirverName);
449 LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass());
450 } catch (Throwable e) {
451 LOG.info("Database " + kind + " driver override not found for : [" + dirverName
452 + "]. Will use default implementation.");
453 }
454 } catch (SQLException e) {
455 LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: "
456 + e.getMessage());
457 JDBCPersistenceAdapter.log("Failure Details: ", e);
458 }
459 } finally {
460 c.close();
461 }
462 return adapter;
463 }
464
465 public void setAdapter(JDBCAdapter adapter) {
466 this.adapter = adapter;
467 this.adapter.setStatements(getStatements());
468 this.adapter.setMaxRows(getMaxRows());
469 }
470
471 public WireFormat getWireFormat() {
472 return wireFormat;
473 }
474
475 public void setWireFormat(WireFormat wireFormat) {
476 this.wireFormat = wireFormat;
477 }
478
479 public TransactionContext getTransactionContext(ConnectionContext context) throws IOException {
480 if (context == null) {
481 return getTransactionContext();
482 } else {
483 TransactionContext answer = (TransactionContext)context.getLongTermStoreContext();
484 if (answer == null) {
485 answer = getTransactionContext();
486 context.setLongTermStoreContext(answer);
487 }
488 return answer;
489 }
490 }
491
492 public TransactionContext getTransactionContext() throws IOException {
493 TransactionContext answer = new TransactionContext(this);
494 if (transactionIsolation > 0) {
495 answer.setTransactionIsolation(transactionIsolation);
496 }
497 return answer;
498 }
499
500 public void beginTransaction(ConnectionContext context) throws IOException {
501 TransactionContext transactionContext = getTransactionContext(context);
502 transactionContext.begin();
503 }
504
505 public void commitTransaction(ConnectionContext context) throws IOException {
506 TransactionContext transactionContext = getTransactionContext(context);
507 transactionContext.commit();
508 }
509
510 public void rollbackTransaction(ConnectionContext context) throws IOException {
511 TransactionContext transactionContext = getTransactionContext(context);
512 transactionContext.rollback();
513 }
514
515 public int getCleanupPeriod() {
516 return cleanupPeriod;
517 }
518
519 /**
520 * Sets the number of milliseconds until the database is attempted to be
521 * cleaned up for durable topics
522 */
523 public void setCleanupPeriod(int cleanupPeriod) {
524 this.cleanupPeriod = cleanupPeriod;
525 }
526
527 public void deleteAllMessages() throws IOException {
528 TransactionContext c = getTransactionContext();
529 try {
530 getAdapter().doDropTables(c);
531 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
532 getAdapter().doCreateTables(c);
533 LOG.info("Persistence store purged.");
534 } catch (SQLException e) {
535 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
536 throw IOExceptionSupport.create(e);
537 } finally {
538 c.close();
539 }
540 }
541
542 public boolean isUseExternalMessageReferences() {
543 return useExternalMessageReferences;
544 }
545
546 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
547 this.useExternalMessageReferences = useExternalMessageReferences;
548 }
549
550 public boolean isCreateTablesOnStartup() {
551 return createTablesOnStartup;
552 }
553
554 /**
555 * Sets whether or not tables are created on startup
556 */
557 public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
558 this.createTablesOnStartup = createTablesOnStartup;
559 }
560
561 public boolean isUseDatabaseLock() {
562 return useDatabaseLock;
563 }
564
565 /**
566 * Sets whether or not an exclusive database lock should be used to enable
567 * JDBC Master/Slave. Enabled by default.
568 */
569 public void setUseDatabaseLock(boolean useDatabaseLock) {
570 this.useDatabaseLock = useDatabaseLock;
571 }
572
573 public static void log(String msg, SQLException e) {
574 String s = msg + e.getMessage();
575 while (e.getNextException() != null) {
576 e = e.getNextException();
577 s += ", due to: " + e.getMessage();
578 }
579 LOG.warn(s, e);
580 }
581
582 public Statements getStatements() {
583 if (statements == null) {
584 statements = new Statements();
585 }
586 return statements;
587 }
588
589 public void setStatements(Statements statements) {
590 this.statements = statements;
591 }
592
593 /**
594 * @param usageManager The UsageManager that is controlling the
595 * destination's memory usage.
596 */
597 public void setUsageManager(SystemUsage usageManager) {
598 }
599
600 protected void databaseLockKeepAlive() {
601 boolean stop = false;
602 try {
603 DatabaseLocker locker = getDatabaseLocker();
604 if (locker != null) {
605 if (!locker.keepAlive()) {
606 stop = true;
607 }
608 }
609 } catch (IOException e) {
610 LOG.error("Failed to get database when trying keepalive: " + e, e);
611 }
612 if (stop) {
613 stopBroker();
614 }
615 }
616
617 protected void stopBroker() {
618 // we can no longer keep the lock so lets fail
619 LOG.info("No longer able to keep the exclusive lock so giving up being a master");
620 try {
621 brokerService.stop();
622 } catch (Exception e) {
623 LOG.warn("Failure occurred while stopping broker");
624 }
625 }
626
627 protected DatabaseLocker loadDataBaseLocker() throws IOException {
628 DatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock");
629 if (locker == null) {
630 locker = new DefaultDatabaseLocker();
631 LOG.debug("Using default JDBC Locker: " + locker);
632 }
633 return locker;
634 }
635
636 public void setBrokerName(String brokerName) {
637 }
638
639 public String toString() {
640 return "JDBCPersistenceAdapter(" + super.toString() + ")";
641 }
642
643 public void setDirectory(File dir) {
644 }
645
646 // interesting bit here is proof that DB is ok
647 public void checkpoint(boolean sync) throws IOException {
648 // by pass TransactionContext to avoid IO Exception handler
649 Connection connection = null;
650 try {
651 connection = getDataSource().getConnection();
652 } catch (SQLException e) {
653 LOG.debug("Could not get JDBC connection for checkpoint: " + e);
654 throw IOExceptionSupport.create(e);
655 } finally {
656 if (connection != null) {
657 try {
658 connection.close();
659 } catch (Throwable ignored) {
660 }
661 }
662 }
663 }
664
665 public long size(){
666 return 0;
667 }
668
669 public long getLockKeepAlivePeriod() {
670 return lockKeepAlivePeriod;
671 }
672
673 public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) {
674 this.lockKeepAlivePeriod = lockKeepAlivePeriod;
675 }
676
677 public long getLockAcquireSleepInterval() {
678 return lockAcquireSleepInterval;
679 }
680
681 /**
682 * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker
683 * not applied if DataBaseLocker is injected.
684 */
685 public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
686 this.lockAcquireSleepInterval = lockAcquireSleepInterval;
687 }
688
689 /**
690 * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
691 * This allowable dirty isolation level may not be achievable in clustered DB environments
692 * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ
693 * see isolation level constants in {@link java.sql.Connection}
694 * @param transactionIsolation the isolation level to use
695 */
696 public void setTransactionIsolation(int transactionIsolation) {
697 this.transactionIsolation = transactionIsolation;
698 }
699
700 public int getMaxProducersToAudit() {
701 return maxProducersToAudit;
702 }
703
704 public void setMaxProducersToAudit(int maxProducersToAudit) {
705 this.maxProducersToAudit = maxProducersToAudit;
706 }
707
708 public int getMaxAuditDepth() {
709 return maxAuditDepth;
710 }
711
712 public void setMaxAuditDepth(int maxAuditDepth) {
713 this.maxAuditDepth = maxAuditDepth;
714 }
715
716 public boolean isEnableAudit() {
717 return enableAudit;
718 }
719
720 public void setEnableAudit(boolean enableAudit) {
721 this.enableAudit = enableAudit;
722 }
723
724 public int getAuditRecoveryDepth() {
725 return auditRecoveryDepth;
726 }
727
728 public void setAuditRecoveryDepth(int auditRecoveryDepth) {
729 this.auditRecoveryDepth = auditRecoveryDepth;
730 }
731
732 public long getNextSequenceId() {
733 synchronized(sequenceGenerator) {
734 return sequenceGenerator.getNextSequenceId();
735 }
736 }
737
738 public int getMaxRows() {
739 return maxRows;
740 }
741
742 /*
743 * the max rows return from queries, with sparse selectors this may need to be increased
744 */
745 public void setMaxRows(int maxRows) {
746 this.maxRows = maxRows;
747 }
748 }