001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.console.command;
018
019 import java.util.ArrayList;
020 import java.util.Iterator;
021 import java.util.List;
022 import java.util.StringTokenizer;
023
024 import javax.management.MBeanServerConnection;
025 import javax.management.ObjectInstance;
026 import javax.management.ObjectName;
027 import javax.management.openmbean.CompositeData;
028 import javax.management.remote.JMXConnector;
029
030 import org.apache.activemq.console.util.JmxMBeansUtil;
031
032 public class PurgeCommand extends AbstractJmxCommand {
033
034 protected String[] helpFile = new String[] {
035 "Task Usage: Main purge [browse-options] <destinations>",
036 "Description: Delete selected destination's messages that matches the message selector.",
037 "",
038 "Purge Options:",
039 " --msgsel <msgsel1,msglsel2> Add to the search list messages matched by the query similar to",
040 " the messages selector format.",
041 " --jmxurl <url> Set the JMX URL to connect to.",
042 " --pid <pid> Set the pid to connect to (only on Sun JVM).",
043 " --jmxuser <user> Set the JMX user used for authenticating.",
044 " --jmxpassword <password> Set the JMX password used for authenticating.",
045 " --jmxlocal Use the local JMX server instead of a remote one.",
046 " --version Display the version information.",
047 " -h,-?,--help Display the browse broker help information.",
048 "",
049 "Examples:",
050 " Main purge FOO.BAR",
051 " - Delete all the messages in queue FOO.BAR",
052
053 " Main purge --msgsel JMSMessageID='*:10',JMSPriority>5 FOO.*",
054 " - Delete all the messages in the destinations that matches FOO.* and has a JMSMessageID in",
055 " the header field that matches the wildcard *:10, and has a JMSPriority field > 5 in the",
056 " queue FOO.BAR",
057 " * To use wildcard queries, the field must be a string and the query enclosed in ''",
058 "",
059 };
060
061 private final List<String> queryAddObjects = new ArrayList<String>(10);
062 private final List<String> querySubObjects = new ArrayList<String>(10);
063
064 /**
065 * Execute the purge command, which allows you to purge the messages in a
066 * given JMS destination
067 *
068 * @param tokens - command arguments
069 * @throws Exception
070 */
071 protected void runTask(List<String> tokens) throws Exception {
072 try {
073 // If there is no queue name specified, let's select all
074 if (tokens.isEmpty()) {
075 tokens.add("*");
076 }
077
078 // Iterate through the queue names
079 for (Iterator<String> i = tokens.iterator(); i.hasNext();) {
080 List queueList = JmxMBeansUtil.queryMBeans(createJmxConnection(), "Type=Queue,Destination=" + i.next() + ",*");
081
082 for (Iterator j = queueList.iterator(); j.hasNext();) {
083 ObjectName queueName = ((ObjectInstance)j.next()).getObjectName();
084 if (queryAddObjects.isEmpty()) {
085 purgeQueue(queueName);
086 } else {
087 List messages = JmxMBeansUtil.createMessageQueryFilter(createJmxConnection(), queueName).query(queryAddObjects);
088 purgeMessages(queueName, messages);
089 }
090 }
091 }
092 } catch (Exception e) {
093 context.printException(new RuntimeException("Failed to execute purge task. Reason: " + e));
094 throw new Exception(e);
095 }
096 }
097
098 /**
099 * Purge all the messages in the queue
100 *
101 * @param queue - ObjectName of the queue to purge
102 * @throws Exception
103 */
104 public void purgeQueue(ObjectName queue) throws Exception {
105 context.printInfo("Purging all messages in queue: " + queue.getKeyProperty("Destination"));
106 createJmxConnection().invoke(queue, "purge", new Object[] {}, new String[] {});
107 }
108
109 /**
110 * Purge selected messages in the queue
111 *
112 * @param queue - ObjectName of the queue to purge the messages from
113 * @param messages - List of messages to purge
114 * @throws Exception
115 */
116 public void purgeMessages(ObjectName queue, List messages) throws Exception {
117 Object[] param = new Object[1];
118 for (Iterator i = messages.iterator(); i.hasNext();) {
119 CompositeData msg = (CompositeData)i.next();
120 param[0] = "" + msg.get("JMSMessageID");
121 context.printInfo("Removing message: " + param[0] + " from queue: " + queue.getKeyProperty("Destination"));
122 createJmxConnection().invoke(queue, "removeMessage", param, new String[] {
123 "java.lang.String"
124 });
125 }
126 }
127
128 /**
129 * Handle the --msgsel, --xmsgsel.
130 *
131 * @param token - option token to handle
132 * @param tokens - succeeding command arguments
133 * @throws Exception
134 */
135 protected void handleOption(String token, List<String> tokens) throws Exception {
136 // If token is an additive message selector option
137 if (token.startsWith("--msgsel")) {
138
139 // If no message selector is specified, or next token is a new
140 // option
141 if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) {
142 context.printException(new IllegalArgumentException("Message selector not specified"));
143 return;
144 }
145
146 StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER);
147 while (queryTokens.hasMoreTokens()) {
148 queryAddObjects.add(queryTokens.nextToken());
149 }
150 } else if (token.startsWith("--xmsgsel")) {
151 // If token is a substractive message selector option
152
153 // If no message selector is specified, or next token is a new
154 // option
155 if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) {
156 context.printException(new IllegalArgumentException("Message selector not specified"));
157 return;
158 }
159
160 StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER);
161 while (queryTokens.hasMoreTokens()) {
162 querySubObjects.add(queryTokens.nextToken());
163 }
164
165 } else {
166 // Let super class handle unknown option
167 super.handleOption(token, tokens);
168 }
169 }
170
171 /**
172 * Print the help messages for the browse command
173 */
174 protected void printHelp() {
175 context.printHelp(helpFile);
176 }
177
178 }