001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.plugin;
018
019 import org.apache.activemq.advisory.AdvisorySupport;
020 import org.apache.activemq.broker.Broker;
021 import org.apache.activemq.broker.BrokerFilter;
022 import org.apache.activemq.broker.BrokerService;
023 import org.apache.activemq.broker.ConnectionContext;
024 import org.apache.activemq.broker.ProducerBrokerExchange;
025 import org.apache.activemq.broker.region.Destination;
026 import org.apache.activemq.broker.region.DestinationStatistics;
027 import org.apache.activemq.broker.region.RegionBroker;
028 import org.apache.activemq.command.ActiveMQDestination;
029 import org.apache.activemq.command.ActiveMQMapMessage;
030 import org.apache.activemq.command.Message;
031 import org.apache.activemq.command.MessageId;
032 import org.apache.activemq.command.ProducerId;
033 import org.apache.activemq.command.ProducerInfo;
034 import org.apache.activemq.state.ProducerState;
035 import org.apache.activemq.usage.SystemUsage;
036 import org.apache.activemq.util.IdGenerator;
037 import org.apache.activemq.util.LongSequenceGenerator;
038 import org.slf4j.Logger;
039 import org.slf4j.LoggerFactory;
040 import java.io.File;
041 import java.net.URI;
042 import java.util.Set;
043 /**
044 * A StatisticsBroker You can retrieve a Map Message for a Destination - or
045 * Broker containing statistics as key-value pairs The message must contain a
046 * replyTo Destination - else its ignored
047 *
048 */
049 public class StatisticsBroker extends BrokerFilter {
050 private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class);
051 static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
052 static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
053 private static final IdGenerator ID_GENERATOR = new IdGenerator();
054 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
055 protected final ProducerId advisoryProducerId = new ProducerId();
056
057 /**
058 *
059 * Constructor
060 *
061 * @param next
062 */
063 public StatisticsBroker(Broker next) {
064 super(next);
065 this.advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
066 }
067
068 /**
069 * Sets the persistence mode
070 *
071 * @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange,
072 * org.apache.activemq.command.Message)
073 */
074 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
075 ActiveMQDestination msgDest = messageSend.getDestination();
076 ActiveMQDestination replyTo = messageSend.getReplyTo();
077 if (replyTo != null) {
078 String physicalName = msgDest.getPhysicalName();
079 boolean destStats = physicalName.regionMatches(true, 0, STATS_DESTINATION_PREFIX, 0,
080 STATS_DESTINATION_PREFIX.length());
081 boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX
082 .length());
083 if (destStats) {
084 String queueryName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length());
085 ActiveMQDestination queryDest = ActiveMQDestination.createDestination(queueryName,msgDest.getDestinationType());
086 Set<Destination> set = getDestinations(queryDest);
087 for (Destination dest : set) {
088 DestinationStatistics stats = dest.getDestinationStatistics();
089 if (stats != null) {
090 ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
091 statsMessage.setString("destinationName", dest.getActiveMQDestination().toString());
092 statsMessage.setLong("size", stats.getMessages().getCount());
093 statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
094 statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
095 statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
096 statsMessage.setLong("expiredCount", stats.getExpired().getCount());
097 statsMessage.setLong("inflightCount", stats.getInflight().getCount());
098 statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
099 statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage());
100 statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage());
101 statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit());
102 statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
103 statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
104 statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
105 statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
106 statsMessage.setLong("producerCount", stats.getProducers().getCount());
107 statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
108 sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
109 }
110 }
111 } else if (brokerStats) {
112 ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
113 BrokerService brokerService = getBrokerService();
114 RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
115 SystemUsage systemUsage = brokerService.getSystemUsage();
116 DestinationStatistics stats = regionBroker.getDestinationStatistics();
117 statsMessage.setString("brokerName", regionBroker.getBrokerName());
118 statsMessage.setString("brokerId", regionBroker.getBrokerId().toString());
119 statsMessage.setLong("size", stats.getMessages().getCount());
120 statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
121 statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
122 statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
123 statsMessage.setLong("expiredCount", stats.getExpired().getCount());
124 statsMessage.setLong("inflightCount", stats.getInflight().getCount());
125 statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
126 statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage());
127 statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage());
128 statsMessage.setLong("memoryLimit", systemUsage.getMemoryUsage().getLimit());
129 statsMessage.setInt("storePercentUsage", systemUsage.getStoreUsage().getPercentUsage());
130 statsMessage.setLong("storeUsage", systemUsage.getStoreUsage().getUsage());
131 statsMessage.setLong("storeLimit", systemUsage.getStoreUsage().getLimit());
132 statsMessage.setInt("tempPercentUsage", systemUsage.getTempUsage().getPercentUsage());
133 statsMessage.setLong("tempUsage", systemUsage.getTempUsage().getUsage());
134 statsMessage.setLong("tempLimit", systemUsage.getTempUsage().getLimit());
135 statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
136 statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
137 statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
138 statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
139 statsMessage.setLong("producerCount", stats.getProducers().getCount());
140 String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp");
141 answer = answer != null ? answer : "";
142 statsMessage.setString("openwire", answer);
143 answer = brokerService.getTransportConnectorURIsAsMap().get("stomp");
144 answer = answer != null ? answer : "";
145 statsMessage.setString("stomp", answer);
146 answer = brokerService.getTransportConnectorURIsAsMap().get("ssl");
147 answer = answer != null ? answer : "";
148 statsMessage.setString("ssl", answer);
149 answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl");
150 answer = answer != null ? answer : "";
151 statsMessage.setString("stomp+ssl", answer);
152 URI uri = brokerService.getVmConnectorURI();
153 answer = uri != null ? uri.toString() : "";
154 statsMessage.setString("vm", answer);
155 File file = brokerService.getDataDirectoryFile();
156 answer = file != null ? file.getCanonicalPath() : "";
157 statsMessage.setString("dataDirectory", answer);
158 statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
159 sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
160 } else {
161 super.send(producerExchange, messageSend);
162 }
163 } else {
164 super.send(producerExchange, messageSend);
165 }
166 }
167
168 public void start() throws Exception {
169 super.start();
170 LOG.info("Starting StatisticsBroker");
171 }
172
173 public void stop() throws Exception {
174 super.stop();
175 }
176
177 protected void sendStats(ConnectionContext context, ActiveMQMapMessage msg, ActiveMQDestination replyTo)
178 throws Exception {
179 msg.setPersistent(false);
180 msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
181 msg.setMessageId(new MessageId(this.advisoryProducerId, this.messageIdGenerator.getNextSequenceId()));
182 msg.setDestination(replyTo);
183 msg.setResponseRequired(false);
184 msg.setProducerId(this.advisoryProducerId);
185 boolean originalFlowControl = context.isProducerFlowControl();
186 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
187 producerExchange.setConnectionContext(context);
188 producerExchange.setMutable(true);
189 producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
190 try {
191 context.setProducerFlowControl(false);
192 this.next.send(producerExchange, msg);
193 } finally {
194 context.setProducerFlowControl(originalFlowControl);
195 }
196 }
197 }