001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.util.HashMap;
020 import java.util.Map;
021 import java.util.concurrent.atomic.AtomicLong;
022 import javax.jms.Destination;
023 import javax.jms.IllegalStateException;
024 import javax.jms.InvalidDestinationException;
025 import javax.jms.JMSException;
026 import javax.jms.Message;
027 import org.apache.activemq.command.ActiveMQDestination;
028 import org.apache.activemq.command.ProducerAck;
029 import org.apache.activemq.command.ProducerId;
030 import org.apache.activemq.command.ProducerInfo;
031 import org.apache.activemq.management.JMSProducerStatsImpl;
032 import org.apache.activemq.management.StatsCapable;
033 import org.apache.activemq.management.StatsImpl;
034 import org.apache.activemq.usage.MemoryUsage;
035 import org.apache.activemq.util.IntrospectionSupport;
036
037 /**
038 * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
039 * destination. A <CODE>MessageProducer</CODE> object is created by passing a
040 * <CODE>Destination</CODE> object to a message-producer creation method
041 * supplied by a session.
042 * <P>
043 * <CODE>MessageProducer</CODE> is the parent interface for all message
044 * producers.
045 * <P>
046 * A client also has the option of creating a message producer without supplying
047 * a destination. In this case, a destination must be provided with every send
048 * operation. A typical use for this kind of message producer is to send replies
049 * to requests using the request's <CODE>JMSReplyTo</CODE> destination.
050 * <P>
051 * A client can specify a default delivery mode, priority, and time to live for
052 * messages sent by a message producer. It can also specify the delivery mode,
053 * priority, and time to live for an individual message.
054 * <P>
055 * A client can specify a time-to-live value in milliseconds for each message it
056 * sends. This value defines a message expiration time that is the sum of the
057 * message's time-to-live and the GMT when it is sent (for transacted sends,
058 * this is the time the client sends the message, not the time the transaction
059 * is committed).
060 * <P>
061 * A JMS provider should do its best to expire messages accurately; however, the
062 * JMS API does not define the accuracy provided.
063 *
064 *
065 * @see javax.jms.TopicPublisher
066 * @see javax.jms.QueueSender
067 * @see javax.jms.Session#createProducer
068 */
069 public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable {
070
071 protected ProducerInfo info;
072 protected boolean closed;
073
074 private final JMSProducerStatsImpl stats;
075 private AtomicLong messageSequence;
076 private final long startTime;
077 private MessageTransformer transformer;
078 private MemoryUsage producerWindow;
079
080 protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
081 super(session);
082 this.info = new ProducerInfo(producerId);
083 this.info.setWindowSize(session.connection.getProducerWindowSize());
084 if (destination != null && destination.getOptions() != null) {
085 Map<String, String> options = new HashMap<String, String>(destination.getOptions());
086 IntrospectionSupport.setProperties(this.info, options, "producer.");
087 }
088 this.info.setDestination(destination);
089
090 // Enable producer window flow control if protocol > 3 and the window
091 // size > 0
092 if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
093 producerWindow = new MemoryUsage("Producer Window: " + producerId);
094 producerWindow.setExecutor(session.getConnectionExecutor());
095 producerWindow.setLimit(this.info.getWindowSize());
096 producerWindow.start();
097 }
098
099 this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
100 this.defaultPriority = Message.DEFAULT_PRIORITY;
101 this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
102 this.startTime = System.currentTimeMillis();
103 this.messageSequence = new AtomicLong(0);
104 this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
105 this.session.addProducer(this);
106 this.session.asyncSendPacket(info);
107 this.setSendTimeout(sendTimeout);
108 setTransformer(session.getTransformer());
109 }
110
111 public StatsImpl getStats() {
112 return stats;
113 }
114
115 public JMSProducerStatsImpl getProducerStats() {
116 return stats;
117 }
118
119 /**
120 * Gets the destination associated with this <CODE>MessageProducer</CODE>.
121 *
122 * @return this producer's <CODE>Destination/ <CODE>
123 * @throws JMSException if the JMS provider fails to close the producer due to
124 * some internal error.
125 * @since 1.1
126 */
127 public Destination getDestination() throws JMSException {
128 checkClosed();
129 return this.info.getDestination();
130 }
131
132 /**
133 * Closes the message producer.
134 * <P>
135 * Since a provider may allocate some resources on behalf of a <CODE>
136 * MessageProducer</CODE>
137 * outside the Java virtual machine, clients should close them when they are
138 * not needed. Relying on garbage collection to eventually reclaim these
139 * resources may not be timely enough.
140 *
141 * @throws JMSException if the JMS provider fails to close the producer due
142 * to some internal error.
143 */
144 public void close() throws JMSException {
145 if (!closed) {
146 dispose();
147 this.session.asyncSendPacket(info.createRemoveCommand());
148 }
149 }
150
151 public void dispose() {
152 if (!closed) {
153 this.session.removeProducer(this);
154 if (producerWindow != null) {
155 producerWindow.stop();
156 }
157 closed = true;
158 }
159 }
160
161 /**
162 * Check if the instance of this producer has been closed.
163 *
164 * @throws IllegalStateException
165 */
166 @Override
167 protected void checkClosed() throws IllegalStateException {
168 if (closed) {
169 throw new IllegalStateException("The producer is closed");
170 }
171 }
172
173 /**
174 * Sends a message to a destination for an unidentified message producer,
175 * specifying delivery mode, priority and time to live.
176 * <P>
177 * Typically, a message producer is assigned a destination at creation time;
178 * however, the JMS API also supports unidentified message producers, which
179 * require that the destination be supplied every time a message is sent.
180 *
181 * @param destination the destination to send this message to
182 * @param message the message to send
183 * @param deliveryMode the delivery mode to use
184 * @param priority the priority for this message
185 * @param timeToLive the message's lifetime (in milliseconds)
186 * @throws JMSException if the JMS provider fails to send the message due to
187 * some internal error.
188 * @throws UnsupportedOperationException if an invalid destination is
189 * specified.
190 * @throws InvalidDestinationException if a client uses this method with an
191 * invalid destination.
192 * @see javax.jms.Session#createProducer
193 * @since 1.1
194 */
195 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
196 checkClosed();
197 if (destination == null) {
198 if (info.getDestination() == null) {
199 throw new UnsupportedOperationException("A destination must be specified.");
200 }
201 throw new InvalidDestinationException("Don't understand null destinations");
202 }
203
204 ActiveMQDestination dest;
205 if (destination == info.getDestination()) {
206 dest = (ActiveMQDestination)destination;
207 } else if (info.getDestination() == null) {
208 dest = ActiveMQDestination.transform(destination);
209 } else {
210 throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
211 }
212 if (dest == null) {
213 throw new JMSException("No destination specified");
214 }
215
216 if (transformer != null) {
217 Message transformedMessage = transformer.producerTransform(session, this, message);
218 if (transformedMessage != null) {
219 message = transformedMessage;
220 }
221 }
222
223 if (producerWindow != null) {
224 try {
225 producerWindow.waitForSpace();
226 } catch (InterruptedException e) {
227 throw new JMSException("Send aborted due to thread interrupt.");
228 }
229 }
230
231 this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,sendTimeout);
232
233 stats.onMessage();
234 }
235
236 public MessageTransformer getTransformer() {
237 return transformer;
238 }
239
240 /**
241 * Sets the transformer used to transform messages before they are sent on
242 * to the JMS bus
243 */
244 public void setTransformer(MessageTransformer transformer) {
245 this.transformer = transformer;
246 }
247
248 /**
249 * @return the time in milli second when this object was created.
250 */
251 protected long getStartTime() {
252 return this.startTime;
253 }
254
255 /**
256 * @return Returns the messageSequence.
257 */
258 protected long getMessageSequence() {
259 return messageSequence.incrementAndGet();
260 }
261
262 /**
263 * @param messageSequence The messageSequence to set.
264 */
265 protected void setMessageSequence(AtomicLong messageSequence) {
266 this.messageSequence = messageSequence;
267 }
268
269 /**
270 * @return Returns the info.
271 */
272 protected ProducerInfo getProducerInfo() {
273 return this.info != null ? this.info : null;
274 }
275
276 /**
277 * @param info The info to set
278 */
279 protected void setProducerInfo(ProducerInfo info) {
280 this.info = info;
281 }
282
283 @Override
284 public String toString() {
285 return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }";
286 }
287
288 public void onProducerAck(ProducerAck pa) {
289 if (this.producerWindow != null) {
290 this.producerWindow.decreaseUsage(pa.getSize());
291 }
292 }
293
294 }