001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.util;
018
019 import java.util.Set;
020 import javax.annotation.PostConstruct;
021 import org.apache.activemq.broker.BrokerPluginSupport;
022 import org.apache.activemq.broker.Connection;
023 import org.apache.activemq.broker.ConnectionContext;
024 import org.apache.activemq.broker.ConsumerBrokerExchange;
025 import org.apache.activemq.broker.ProducerBrokerExchange;
026 import org.apache.activemq.broker.region.Destination;
027 import org.apache.activemq.broker.region.MessageReference;
028 import org.apache.activemq.broker.region.Subscription;
029 import org.apache.activemq.command.ActiveMQDestination;
030 import org.apache.activemq.command.BrokerInfo;
031 import org.apache.activemq.command.ConnectionInfo;
032 import org.apache.activemq.command.ConsumerInfo;
033 import org.apache.activemq.command.DestinationInfo;
034 import org.apache.activemq.command.Message;
035 import org.apache.activemq.command.MessageAck;
036 import org.apache.activemq.command.MessageDispatch;
037 import org.apache.activemq.command.MessageDispatchNotification;
038 import org.apache.activemq.command.MessagePull;
039 import org.apache.activemq.command.ProducerInfo;
040 import org.apache.activemq.command.RemoveSubscriptionInfo;
041 import org.apache.activemq.command.Response;
042 import org.apache.activemq.command.SessionInfo;
043 import org.apache.activemq.command.TransactionId;
044 import org.apache.activemq.usage.Usage;
045 import org.slf4j.Logger;
046 import org.slf4j.LoggerFactory;
047
048 /**
049 * A simple Broker intercepter which allows you to enable/disable logging.
050 *
051 * @org.apache.xbean.XBean
052 */
053
054 public class LoggingBrokerPlugin extends BrokerPluginSupport {
055
056 private static final Logger LOG = LoggerFactory.getLogger(LoggingBrokerPlugin.class);
057
058 private boolean logAll = false;
059 private boolean logMessageEvents = false;
060 private boolean logConnectionEvents = true;
061 private boolean logTransactionEvents = false;
062 private boolean logConsumerEvents = false;
063 private boolean logProducerEvents = false;
064 private boolean logInternalEvents = false;
065
066 /**
067 *
068 * @throws Exception
069 * @org.apache.xbean.InitMethod
070 */
071 @PostConstruct
072 public void afterPropertiesSet() throws Exception {
073 LOG.info("Created LoggingBrokerPlugin: " + this.toString());
074 }
075
076 public boolean isLogAll() {
077 return logAll;
078 }
079
080 /**
081 * Logger all Events that go through the Plugin
082 */
083 public void setLogAll(boolean logAll) {
084 this.logAll = logAll;
085 }
086
087 public boolean isLogMessageEvents() {
088 return logMessageEvents;
089 }
090
091 /**
092 * Logger Events that are related to message processing
093 */
094 public void setLogMessageEvents(boolean logMessageEvents) {
095 this.logMessageEvents = logMessageEvents;
096 }
097
098 public boolean isLogConnectionEvents() {
099 return logConnectionEvents;
100 }
101
102 /**
103 * Logger Events that are related to connections and sessions
104 */
105 public void setLogConnectionEvents(boolean logConnectionEvents) {
106 this.logConnectionEvents = logConnectionEvents;
107 }
108
109 public boolean isLogTransactionEvents() {
110 return logTransactionEvents;
111 }
112
113 /**
114 * Logger Events that are related to transaction processing
115 */
116 public void setLogTransactionEvents(boolean logTransactionEvents) {
117 this.logTransactionEvents = logTransactionEvents;
118 }
119
120 public boolean isLogConsumerEvents() {
121 return logConsumerEvents;
122 }
123
124 /**
125 * Logger Events that are related to Consumers
126 */
127 public void setLogConsumerEvents(boolean logConsumerEvents) {
128 this.logConsumerEvents = logConsumerEvents;
129 }
130
131 public boolean isLogProducerEvents() {
132 return logProducerEvents;
133 }
134
135 /**
136 * Logger Events that are related to Producers
137 */
138 public void setLogProducerEvents(boolean logProducerEvents) {
139 this.logProducerEvents = logProducerEvents;
140 }
141
142 public boolean isLogInternalEvents() {
143 return logInternalEvents;
144 }
145
146 /**
147 * Logger Events that are normally internal to the broker
148 */
149 public void setLogInternalEvents(boolean logInternalEvents) {
150 this.logInternalEvents = logInternalEvents;
151 }
152
153 @Override
154 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
155 if (isLogAll() || isLogConsumerEvents()) {
156 LOG.info("Acknowledging message for client ID : " + consumerExchange.getConnectionContext().getClientId()
157 + (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : ""));
158 if (LOG.isTraceEnabled() && ack.getMessageCount() > 1) {
159 LOG.trace("Message count: " + ack.getMessageCount() + ", First Message Id: " + ack.getFirstMessageId()
160 + ", Last Message Id: " + ack.getLastMessageId());
161 }
162 }
163 super.acknowledge(consumerExchange, ack);
164 }
165
166 @Override
167 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
168 if (isLogAll() || isLogConsumerEvents()) {
169 LOG.info("Message Pull from : " + context.getClientId() + " on " + pull.getDestination().getPhysicalName());
170 }
171 return super.messagePull(context, pull);
172 }
173
174 @Override
175 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
176 if (isLogAll() || isLogConnectionEvents()) {
177 LOG.info("Adding Connection : " + info);
178 }
179 super.addConnection(context, info);
180 }
181
182 @Override
183 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
184 if (isLogAll() || isLogConsumerEvents()) {
185 LOG.info("Adding Consumer : " + info);
186 }
187 return super.addConsumer(context, info);
188 }
189
190 @Override
191 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
192 if (isLogAll() || isLogProducerEvents()) {
193 LOG.info("Adding Producer :" + info);
194 }
195 super.addProducer(context, info);
196 }
197
198 @Override
199 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
200 if (isLogAll() || isLogTransactionEvents()) {
201 LOG.info("Commiting transaction : " + xid.getTransactionKey());
202 }
203 super.commitTransaction(context, xid, onePhase);
204 }
205
206 @Override
207 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
208 if (isLogAll() || isLogConsumerEvents()) {
209 LOG.info("Removing subscription : " + info);
210 }
211 super.removeSubscription(context, info);
212 }
213
214 @Override
215 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
216
217 TransactionId[] result = super.getPreparedTransactions(context);
218 if ((isLogAll() || isLogTransactionEvents()) && result != null) {
219 StringBuffer tids = new StringBuffer();
220 for (TransactionId tid : result) {
221 if (tids.length() > 0) {
222 tids.append(", ");
223 }
224 tids.append(tid.getTransactionKey());
225 }
226 LOG.info("Prepared transactions : " + tids);
227 }
228 return result;
229 }
230
231 @Override
232 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
233 if (isLogAll() || isLogTransactionEvents()) {
234 LOG.info("Preparing transaction : " + xid.getTransactionKey());
235 }
236 return super.prepareTransaction(context, xid);
237 }
238
239 @Override
240 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
241 if (isLogAll() || isLogConnectionEvents()) {
242 LOG.info("Removing Connection : " + info);
243 }
244 super.removeConnection(context, info, error);
245 }
246
247 @Override
248 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
249 if (isLogAll() || isLogConsumerEvents()) {
250 LOG.info("Removing Consumer : " + info);
251 }
252 super.removeConsumer(context, info);
253 }
254
255 @Override
256 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
257 if (isLogAll() || isLogProducerEvents()) {
258 LOG.info("Removing Producer : " + info);
259 }
260 super.removeProducer(context, info);
261 }
262
263 @Override
264 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
265 if (isLogAll() || isLogTransactionEvents()) {
266 LOG.info("Rolling back Transaction : " + xid.getTransactionKey());
267 }
268 super.rollbackTransaction(context, xid);
269 }
270
271 @Override
272 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
273 if (isLogAll() || isLogProducerEvents()) {
274 LOG.info("Sending message : " + messageSend.copy());
275 }
276 super.send(producerExchange, messageSend);
277 }
278
279 @Override
280 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
281 if (isLogAll() || isLogTransactionEvents()) {
282 LOG.info("Beginning transaction : " + xid.getTransactionKey());
283 }
284 super.beginTransaction(context, xid);
285 }
286
287 @Override
288 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
289 if (isLogAll() || isLogTransactionEvents()) {
290 LOG.info("Forgetting transaction : " + transactionId.getTransactionKey());
291 }
292 super.forgetTransaction(context, transactionId);
293 }
294
295 @Override
296 public Connection[] getClients() throws Exception {
297 Connection[] result = super.getClients();
298
299 if (isLogAll() || isLogInternalEvents()) {
300 if (result == null) {
301 LOG.info("Get Clients returned empty list.");
302 } else {
303 StringBuffer cids = new StringBuffer();
304 for (Connection c : result) {
305 cids.append(cids.length() > 0 ? ", " : "");
306 cids.append(c.getConnectionId());
307 }
308 LOG.info("Connected clients : " + cids);
309 }
310 }
311 return super.getClients();
312 }
313
314 @Override
315 public org.apache.activemq.broker.region.Destination addDestination(ConnectionContext context,
316 ActiveMQDestination destination, boolean create) throws Exception {
317 if (isLogAll() || isLogInternalEvents()) {
318 LOG.info("Adding destination : " + destination.getDestinationTypeAsString() + ":"
319 + destination.getPhysicalName());
320 }
321 return super.addDestination(context, destination, create);
322 }
323
324 @Override
325 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
326 throws Exception {
327 if (isLogAll() || isLogInternalEvents()) {
328 LOG.info("Removing destination : " + destination.getDestinationTypeAsString() + ":"
329 + destination.getPhysicalName());
330 }
331 super.removeDestination(context, destination, timeout);
332 }
333
334 @Override
335 public ActiveMQDestination[] getDestinations() throws Exception {
336 ActiveMQDestination[] result = super.getDestinations();
337 if (isLogAll() || isLogInternalEvents()) {
338 if (result == null) {
339 LOG.info("Get Destinations returned empty list.");
340 } else {
341 StringBuffer destinations = new StringBuffer();
342 for (ActiveMQDestination dest : result) {
343 destinations.append(destinations.length() > 0 ? ", " : "");
344 destinations.append(dest.getPhysicalName());
345 }
346 LOG.info("Get Destinations : " + destinations);
347 }
348 }
349 return result;
350 }
351
352 @Override
353 public void start() throws Exception {
354 if (isLogAll() || isLogInternalEvents()) {
355 LOG.info("Starting " + getBrokerName());
356 }
357 super.start();
358 }
359
360 @Override
361 public void stop() throws Exception {
362 if (isLogAll() || isLogInternalEvents()) {
363 LOG.info("Stopping " + getBrokerName());
364 }
365 super.stop();
366 }
367
368 @Override
369 public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
370 if (isLogAll() || isLogConnectionEvents()) {
371 LOG.info("Adding Session : " + info);
372 }
373 super.addSession(context, info);
374 }
375
376 @Override
377 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
378 if (isLogAll() || isLogConnectionEvents()) {
379 LOG.info("Removing Session : " + info);
380 }
381 super.removeSession(context, info);
382 }
383
384 @Override
385 public void addBroker(Connection connection, BrokerInfo info) {
386 if (isLogAll() || isLogInternalEvents()) {
387 LOG.info("Adding Broker " + info.getBrokerName());
388 }
389 super.addBroker(connection, info);
390 }
391
392 @Override
393 public void removeBroker(Connection connection, BrokerInfo info) {
394 if (isLogAll() || isLogInternalEvents()) {
395 LOG.info("Removing Broker " + info.getBrokerName());
396 }
397 super.removeBroker(connection, info);
398 }
399
400 @Override
401 public BrokerInfo[] getPeerBrokerInfos() {
402 BrokerInfo[] result = super.getPeerBrokerInfos();
403 if (isLogAll() || isLogInternalEvents()) {
404 if (result == null) {
405 LOG.info("Get Peer Broker Infos returned empty list.");
406 } else {
407 StringBuffer peers = new StringBuffer();
408 for (BrokerInfo bi : result) {
409 peers.append(peers.length() > 0 ? ", " : "");
410 peers.append(bi.getBrokerName());
411 }
412 LOG.info("Get Peer Broker Infos : " + peers);
413 }
414 }
415 return result;
416 }
417
418 @Override
419 public void preProcessDispatch(MessageDispatch messageDispatch) {
420 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
421 LOG.info("preProcessDispatch :" + messageDispatch);
422 }
423 super.preProcessDispatch(messageDispatch);
424 }
425
426 @Override
427 public void postProcessDispatch(MessageDispatch messageDispatch) {
428 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
429 LOG.info("postProcessDispatch :" + messageDispatch);
430 }
431 super.postProcessDispatch(messageDispatch);
432 }
433
434 @Override
435 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
436 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
437 LOG.info("ProcessDispatchNotification :" + messageDispatchNotification);
438 }
439 super.processDispatchNotification(messageDispatchNotification);
440 }
441
442 @Override
443 public Set<ActiveMQDestination> getDurableDestinations() {
444 Set<ActiveMQDestination> result = super.getDurableDestinations();
445 if (isLogAll() || isLogInternalEvents()) {
446 if (result == null) {
447 LOG.info("Get Durable Destinations returned empty list.");
448 } else {
449 StringBuffer destinations = new StringBuffer();
450 for (ActiveMQDestination dest : result) {
451 destinations.append(destinations.length() > 0 ? ", " : "");
452 destinations.append(dest.getPhysicalName());
453 }
454 LOG.info("Get Durable Destinations : " + destinations);
455 }
456 }
457 return result;
458 }
459
460 @Override
461 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
462 if (isLogAll() || isLogInternalEvents()) {
463 LOG.info("Adding destination info : " + info);
464 }
465 super.addDestinationInfo(context, info);
466 }
467
468 @Override
469 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
470 if (isLogAll() || isLogInternalEvents()) {
471 LOG.info("Removing destination info : " + info);
472 }
473 super.removeDestinationInfo(context, info);
474 }
475
476 @Override
477 public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
478 if (isLogAll() || isLogInternalEvents()) {
479 String msg = "Unable to display message.";
480
481 msg = message.getMessage().toString();
482
483 LOG.info("Message has expired : " + msg);
484 }
485 super.messageExpired(context, message, subscription);
486 }
487
488 @Override
489 public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
490 Subscription subscription) {
491 if (isLogAll() || isLogInternalEvents()) {
492 String msg = "Unable to display message.";
493
494 msg = messageReference.getMessage().toString();
495
496 LOG.info("Sending to DLQ : " + msg);
497 }
498 super.sendToDeadLetterQueue(context, messageReference, subscription);
499 }
500
501 @Override
502 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
503 if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
504 LOG.info("Fast Producer : " + producerInfo);
505 }
506 super.fastProducer(context, producerInfo);
507 }
508
509 @Override
510 public void isFull(ConnectionContext context, Destination destination, Usage usage) {
511 if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
512 LOG.info("Destination is full : " + destination.getName());
513 }
514 super.isFull(context, destination, usage);
515 }
516
517 @Override
518 public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
519 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
520 String msg = "Unable to display message.";
521
522 msg = messageReference.getMessage().toString();
523
524 LOG.info("Message consumed : " + msg);
525 }
526 super.messageConsumed(context, messageReference);
527 }
528
529 @Override
530 public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
531 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
532 String msg = "Unable to display message.";
533
534 msg = messageReference.getMessage().toString();
535
536 LOG.info("Message delivered : " + msg);
537 }
538 super.messageDelivered(context, messageReference);
539 }
540
541 @Override
542 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
543 if (isLogAll() || isLogInternalEvents()) {
544 String msg = "Unable to display message.";
545
546 msg = messageReference.getMessage().toString();
547
548 LOG.info("Message discarded : " + msg);
549 }
550 super.messageDiscarded(context, sub, messageReference);
551 }
552
553 @Override
554 public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) {
555 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
556 LOG.info("Detected slow consumer on " + destination.getName());
557 StringBuffer buf = new StringBuffer("Connection(");
558 buf.append(subs.getConsumerInfo().getConsumerId().getConnectionId());
559 buf.append(") Session(");
560 buf.append(subs.getConsumerInfo().getConsumerId().getSessionId());
561 buf.append(")");
562 LOG.info(buf.toString());
563 }
564 super.slowConsumer(context, destination, subs);
565 }
566
567 @Override
568 public void nowMasterBroker() {
569 if (isLogAll() || isLogInternalEvents()) {
570 LOG.info("Is now the master broker : " + getBrokerName());
571 }
572 super.nowMasterBroker();
573 }
574
575 @Override
576 public String toString() {
577 StringBuffer buf = new StringBuffer();
578 buf.append("LoggingBrokerPlugin(");
579 buf.append("logAll=");
580 buf.append(isLogAll());
581 buf.append(", logConnectionEvents=");
582 buf.append(isLogConnectionEvents());
583 buf.append(", logConsumerEvents=");
584 buf.append(isLogConsumerEvents());
585 buf.append(", logProducerEvents=");
586 buf.append(isLogProducerEvents());
587 buf.append(", logMessageEvents=");
588 buf.append(isLogMessageEvents());
589 buf.append(", logTransactionEvents=");
590 buf.append(isLogTransactionEvents());
591 buf.append(", logInternalEvents=");
592 buf.append(isLogInternalEvents());
593 buf.append(")");
594 return buf.toString();
595 }
596 }