001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.stomp;
018
019 import java.io.IOException;
020 import java.io.OutputStreamWriter;
021 import java.io.PrintWriter;
022 import java.util.HashMap;
023 import java.util.Iterator;
024 import java.util.Map;
025 import java.util.concurrent.ConcurrentHashMap;
026 import java.util.concurrent.atomic.AtomicBoolean;
027
028 import javax.jms.JMSException;
029
030 import org.apache.activemq.broker.BrokerContext;
031 import org.apache.activemq.broker.BrokerContextAware;
032 import org.apache.activemq.command.ActiveMQDestination;
033 import org.apache.activemq.command.ActiveMQMessage;
034 import org.apache.activemq.command.ActiveMQTempQueue;
035 import org.apache.activemq.command.ActiveMQTempTopic;
036 import org.apache.activemq.command.Command;
037 import org.apache.activemq.command.ConnectionError;
038 import org.apache.activemq.command.ConnectionId;
039 import org.apache.activemq.command.ConnectionInfo;
040 import org.apache.activemq.command.ConsumerId;
041 import org.apache.activemq.command.ConsumerInfo;
042 import org.apache.activemq.command.DestinationInfo;
043 import org.apache.activemq.command.ExceptionResponse;
044 import org.apache.activemq.command.LocalTransactionId;
045 import org.apache.activemq.command.MessageAck;
046 import org.apache.activemq.command.MessageDispatch;
047 import org.apache.activemq.command.MessageId;
048 import org.apache.activemq.command.ProducerId;
049 import org.apache.activemq.command.ProducerInfo;
050 import org.apache.activemq.command.RemoveSubscriptionInfo;
051 import org.apache.activemq.command.Response;
052 import org.apache.activemq.command.SessionId;
053 import org.apache.activemq.command.SessionInfo;
054 import org.apache.activemq.command.ShutdownInfo;
055 import org.apache.activemq.command.TransactionId;
056 import org.apache.activemq.command.TransactionInfo;
057 import org.apache.activemq.util.ByteArrayOutputStream;
058 import org.apache.activemq.util.FactoryFinder;
059 import org.apache.activemq.util.IOExceptionSupport;
060 import org.apache.activemq.util.IdGenerator;
061 import org.apache.activemq.util.IntrospectionSupport;
062 import org.apache.activemq.util.LongSequenceGenerator;
063 import org.slf4j.Logger;
064 import org.slf4j.LoggerFactory;
065 import org.springframework.context.ApplicationContextAware;
066
067 /**
068 * @author <a href="http://hiramchirino.com">chirino</a>
069 */
070 public class ProtocolConverter {
071
072 private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class);
073
074 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
075
076 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
077 private final SessionId sessionId = new SessionId(connectionId, -1);
078 private final ProducerId producerId = new ProducerId(sessionId, 1);
079
080 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
081 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
082 private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
083 private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
084
085 private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
086 private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
087 private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
088 private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
089 private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
090 private final StompTransport stompTransport;
091
092 private final Object commnadIdMutex = new Object();
093 private int lastCommandId;
094 private final AtomicBoolean connected = new AtomicBoolean(false);
095 private final FrameTranslator frameTranslator;
096 private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
097 private final BrokerContext brokerContext;
098
099 public ProtocolConverter(StompTransport stompTransport, FrameTranslator translator, BrokerContext brokerContext) {
100 this.stompTransport = stompTransport;
101 this.frameTranslator = translator;
102 this.brokerContext = brokerContext;
103 }
104
105 protected int generateCommandId() {
106 synchronized (commnadIdMutex) {
107 return lastCommandId++;
108 }
109 }
110
111 protected ResponseHandler createResponseHandler(final StompFrame command) {
112 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
113 if (receiptId != null) {
114 return new ResponseHandler() {
115 public void onResponse(ProtocolConverter converter, Response response) throws IOException {
116 if (response.isException()) {
117 // Generally a command can fail.. but that does not invalidate the connection.
118 // We report back the failure but we don't close the connection.
119 Throwable exception = ((ExceptionResponse)response).getException();
120 handleException(exception, command);
121 } else {
122 StompFrame sc = new StompFrame();
123 sc.setAction(Stomp.Responses.RECEIPT);
124 sc.setHeaders(new HashMap<String, String>(1));
125 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
126 stompTransport.sendToStomp(sc);
127 }
128 }
129 };
130 }
131 return null;
132 }
133
134 protected void sendToActiveMQ(Command command, ResponseHandler handler) {
135 command.setCommandId(generateCommandId());
136 if (handler != null) {
137 command.setResponseRequired(true);
138 resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
139 }
140 stompTransport.sendToActiveMQ(command);
141 }
142
143 protected void sendToStomp(StompFrame command) throws IOException {
144 stompTransport.sendToStomp(command);
145 }
146
147 protected FrameTranslator findTranslator(String header) {
148 FrameTranslator translator = frameTranslator;
149 try {
150 if (header != null) {
151 translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
152 .newInstance(header);
153 if (translator instanceof BrokerContextAware) {
154 ((BrokerContextAware)translator).setBrokerContext(brokerContext);
155 }
156 }
157 } catch (Exception ignore) {
158 // if anything goes wrong use the default translator
159 }
160
161 return translator;
162 }
163
164 /**
165 * Convert a stomp command
166 *
167 * @param command
168 */
169 public void onStompCommand(StompFrame command) throws IOException, JMSException {
170 try {
171
172 if (command.getClass() == StompFrameError.class) {
173 throw ((StompFrameError)command).getException();
174 }
175
176 String action = command.getAction();
177 if (action.startsWith(Stomp.Commands.SEND)) {
178 onStompSend(command);
179 } else if (action.startsWith(Stomp.Commands.ACK)) {
180 onStompAck(command);
181 } else if (action.startsWith(Stomp.Commands.BEGIN)) {
182 onStompBegin(command);
183 } else if (action.startsWith(Stomp.Commands.COMMIT)) {
184 onStompCommit(command);
185 } else if (action.startsWith(Stomp.Commands.ABORT)) {
186 onStompAbort(command);
187 } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
188 onStompSubscribe(command);
189 } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
190 onStompUnsubscribe(command);
191 } else if (action.startsWith(Stomp.Commands.CONNECT)) {
192 onStompConnect(command);
193 } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
194 onStompDisconnect(command);
195 } else {
196 throw new ProtocolException("Unknown STOMP action: " + action);
197 }
198
199 } catch (ProtocolException e) {
200 handleException(e, command);
201 // Some protocol errors can cause the connection to get closed.
202 if( e.isFatal() ) {
203 getStompTransport().onException(e);
204 }
205 }
206 }
207
208 protected void handleException(Throwable exception, StompFrame command) throws IOException {
209 LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
210 if (LOG.isDebugEnabled()) {
211 LOG.debug("Exception detail", exception);
212 }
213
214 // Let the stomp client know about any protocol errors.
215 ByteArrayOutputStream baos = new ByteArrayOutputStream();
216 PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
217 exception.printStackTrace(stream);
218 stream.close();
219
220 HashMap<String, String> headers = new HashMap<String, String>();
221 headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
222
223 if (command != null) {
224 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
225 if (receiptId != null) {
226 headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
227 }
228 }
229
230 StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
231 sendToStomp(errorMessage);
232 }
233
234 protected void onStompSend(StompFrame command) throws IOException, JMSException {
235 checkConnected();
236
237 Map<String, String> headers = command.getHeaders();
238 String stompTx = headers.get(Stomp.Headers.TRANSACTION);
239 headers.remove("transaction");
240
241 ActiveMQMessage message = convertMessage(command);
242
243 message.setProducerId(producerId);
244 MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
245 message.setMessageId(id);
246 message.setJMSTimestamp(System.currentTimeMillis());
247
248 if (stompTx != null) {
249 TransactionId activemqTx = transactions.get(stompTx);
250 if (activemqTx == null) {
251 throw new ProtocolException("Invalid transaction id: " + stompTx);
252 }
253 message.setTransactionId(activemqTx);
254 }
255
256 message.onSend();
257 sendToActiveMQ(message, createResponseHandler(command));
258
259 }
260
261 protected void onStompAck(StompFrame command) throws ProtocolException {
262 checkConnected();
263
264 // TODO: acking with just a message id is very bogus
265 // since the same message id could have been sent to 2 different
266 // subscriptions
267 // on the same stomp connection. For example, when 2 subs are created on
268 // the same topic.
269
270 Map<String, String> headers = command.getHeaders();
271 String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
272 if (messageId == null) {
273 throw new ProtocolException("ACK received without a message-id to acknowledge!");
274 }
275
276 TransactionId activemqTx = null;
277 String stompTx = headers.get(Stomp.Headers.TRANSACTION);
278 if (stompTx != null) {
279 activemqTx = transactions.get(stompTx);
280 if (activemqTx == null) {
281 throw new ProtocolException("Invalid transaction id: " + stompTx);
282 }
283 }
284
285 boolean acked = false;
286 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
287 StompSubscription sub = iter.next();
288 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
289 if (ack != null) {
290 ack.setTransactionId(activemqTx);
291 sendToActiveMQ(ack, createResponseHandler(command));
292 acked = true;
293 break;
294 }
295 }
296
297 if (!acked) {
298 throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
299 }
300
301 }
302
303 protected void onStompBegin(StompFrame command) throws ProtocolException {
304 checkConnected();
305
306 Map<String, String> headers = command.getHeaders();
307
308 String stompTx = headers.get(Stomp.Headers.TRANSACTION);
309
310 if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
311 throw new ProtocolException("Must specify the transaction you are beginning");
312 }
313
314 if (transactions.get(stompTx) != null) {
315 throw new ProtocolException("The transaction was allready started: " + stompTx);
316 }
317
318 LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
319 transactions.put(stompTx, activemqTx);
320
321 TransactionInfo tx = new TransactionInfo();
322 tx.setConnectionId(connectionId);
323 tx.setTransactionId(activemqTx);
324 tx.setType(TransactionInfo.BEGIN);
325
326 sendToActiveMQ(tx, createResponseHandler(command));
327
328 }
329
330 protected void onStompCommit(StompFrame command) throws ProtocolException {
331 checkConnected();
332
333 Map<String, String> headers = command.getHeaders();
334
335 String stompTx = headers.get(Stomp.Headers.TRANSACTION);
336 if (stompTx == null) {
337 throw new ProtocolException("Must specify the transaction you are committing");
338 }
339
340 TransactionId activemqTx = transactions.remove(stompTx);
341 if (activemqTx == null) {
342 throw new ProtocolException("Invalid transaction id: " + stompTx);
343 }
344
345 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
346 StompSubscription sub = iter.next();
347 sub.onStompCommit(activemqTx);
348 }
349
350 TransactionInfo tx = new TransactionInfo();
351 tx.setConnectionId(connectionId);
352 tx.setTransactionId(activemqTx);
353 tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
354
355 sendToActiveMQ(tx, createResponseHandler(command));
356
357 }
358
359 protected void onStompAbort(StompFrame command) throws ProtocolException {
360 checkConnected();
361 Map<String, String> headers = command.getHeaders();
362
363 String stompTx = headers.get(Stomp.Headers.TRANSACTION);
364 if (stompTx == null) {
365 throw new ProtocolException("Must specify the transaction you are committing");
366 }
367
368 TransactionId activemqTx = transactions.remove(stompTx);
369 if (activemqTx == null) {
370 throw new ProtocolException("Invalid transaction id: " + stompTx);
371 }
372 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
373 StompSubscription sub = iter.next();
374 try {
375 sub.onStompAbort(activemqTx);
376 } catch (Exception e) {
377 throw new ProtocolException("Transaction abort failed", false, e);
378 }
379 }
380
381 TransactionInfo tx = new TransactionInfo();
382 tx.setConnectionId(connectionId);
383 tx.setTransactionId(activemqTx);
384 tx.setType(TransactionInfo.ROLLBACK);
385
386 sendToActiveMQ(tx, createResponseHandler(command));
387
388 }
389
390 protected void onStompSubscribe(StompFrame command) throws ProtocolException {
391 checkConnected();
392 FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
393 Map<String, String> headers = command.getHeaders();
394
395 String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
396 String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
397
398 ActiveMQDestination actualDest = translator.convertDestination(this, destination);
399
400 if (actualDest == null) {
401 throw new ProtocolException("Invalid Destination.");
402 }
403
404 ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
405 ConsumerInfo consumerInfo = new ConsumerInfo(id);
406 consumerInfo.setPrefetchSize(1000);
407 consumerInfo.setDispatchAsync(true);
408
409 String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
410 consumerInfo.setSelector(selector);
411
412 IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
413
414 consumerInfo.setDestination(translator.convertDestination(this, destination));
415
416 StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
417 stompSubscription.setDestination(actualDest);
418
419 String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
420 if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
421 stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
422 } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
423 stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
424 } else {
425 stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
426 }
427
428 subscriptionsByConsumerId.put(id, stompSubscription);
429 sendToActiveMQ(consumerInfo, createResponseHandler(command));
430
431 }
432
433 protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
434 checkConnected();
435 Map<String, String> headers = command.getHeaders();
436
437 ActiveMQDestination destination = null;
438 Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
439 if (o != null) {
440 destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o);
441 }
442
443 String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
444
445 if (subscriptionId == null && destination == null) {
446 throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
447 }
448
449 // check if it is a durable subscription
450 String durable = command.getHeaders().get("activemq.subscriptionName");
451 if (durable != null) {
452 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
453 info.setClientId(durable);
454 info.setSubscriptionName(durable);
455 info.setConnectionId(connectionId);
456 sendToActiveMQ(info, createResponseHandler(command));
457 return;
458 }
459
460 // TODO: Unsubscribing using a destination is a bit wierd if multiple
461 // subscriptions
462 // are created with the same destination. Perhaps this should be
463 // removed.
464 //
465 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
466 StompSubscription sub = iter.next();
467 if ((subscriptionId != null && subscriptionId.equals(sub.getSubscriptionId())) || (destination != null && destination.equals(sub.getDestination()))) {
468 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
469 iter.remove();
470 return;
471 }
472 }
473
474 throw new ProtocolException("No subscription matched.");
475 }
476
477 ConnectionInfo connectionInfo = new ConnectionInfo();
478
479 protected void onStompConnect(final StompFrame command) throws ProtocolException {
480
481 if (connected.get()) {
482 throw new ProtocolException("Allready connected.");
483 }
484
485 final Map<String, String> headers = command.getHeaders();
486
487 // allow anyone to login for now
488 String login = headers.get(Stomp.Headers.Connect.LOGIN);
489 String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
490 String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
491
492
493 IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
494
495 connectionInfo.setConnectionId(connectionId);
496 if (clientId != null) {
497 connectionInfo.setClientId(clientId);
498 } else {
499 connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
500 }
501
502 connectionInfo.setResponseRequired(true);
503 connectionInfo.setUserName(login);
504 connectionInfo.setPassword(passcode);
505 connectionInfo.setTransportContext(stompTransport.getPeerCertificates());
506
507 sendToActiveMQ(connectionInfo, new ResponseHandler() {
508 public void onResponse(ProtocolConverter converter, Response response) throws IOException {
509
510 if (response.isException()) {
511 // If the connection attempt fails we close the socket.
512 Throwable exception = ((ExceptionResponse)response).getException();
513 handleException(exception, command);
514 getStompTransport().onException(IOExceptionSupport.create(exception));
515 return;
516 }
517
518 final SessionInfo sessionInfo = new SessionInfo(sessionId);
519 sendToActiveMQ(sessionInfo, null);
520
521 final ProducerInfo producerInfo = new ProducerInfo(producerId);
522 sendToActiveMQ(producerInfo, new ResponseHandler() {
523 public void onResponse(ProtocolConverter converter, Response response) throws IOException {
524
525 if (response.isException()) {
526 // If the connection attempt fails we close the socket.
527 Throwable exception = ((ExceptionResponse)response).getException();
528 handleException(exception, command);
529 getStompTransport().onException(IOExceptionSupport.create(exception));
530 }
531
532 connected.set(true);
533 HashMap<String, String> responseHeaders = new HashMap<String, String>();
534
535 responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
536 String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID);
537 if (requestId == null) {
538 // TODO legacy
539 requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED);
540 }
541 if (requestId != null) {
542 // TODO legacy
543 responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
544 responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
545 }
546
547 StompFrame sc = new StompFrame();
548 sc.setAction(Stomp.Responses.CONNECTED);
549 sc.setHeaders(responseHeaders);
550 sendToStomp(sc);
551 }
552 });
553
554 }
555 });
556 }
557
558 protected void onStompDisconnect(StompFrame command) throws ProtocolException {
559 checkConnected();
560 sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
561 sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
562 connected.set(false);
563 }
564
565 protected void checkConnected() throws ProtocolException {
566 if (!connected.get()) {
567 throw new ProtocolException("Not connected.");
568 }
569 }
570
571 /**
572 * Dispatch a ActiveMQ command
573 *
574 * @param command
575 * @throws IOException
576 */
577 public void onActiveMQCommand(Command command) throws IOException, JMSException {
578 if (command.isResponse()) {
579
580 Response response = (Response)command;
581 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
582 if (rh != null) {
583 rh.onResponse(this, response);
584 } else {
585 // Pass down any unexpected errors. Should this close the connection?
586 if (response.isException()) {
587 Throwable exception = ((ExceptionResponse)response).getException();
588 handleException(exception, null);
589 }
590 }
591 } else if (command.isMessageDispatch()) {
592
593 MessageDispatch md = (MessageDispatch)command;
594 StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
595 if (sub != null) {
596 sub.onMessageDispatch(md);
597 }
598 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
599 // Pass down any unexpected async errors. Should this close the connection?
600 Throwable exception = ((ConnectionError)command).getException();
601 handleException(exception, null);
602 }
603 }
604
605 public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
606 ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command);
607 return msg;
608 }
609
610 public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException {
611 if (ignoreTransformation == true) {
612 return frameTranslator.convertMessage(this, message);
613 } else {
614 return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message);
615 }
616 }
617
618 public StompTransport getStompTransport() {
619 return stompTransport;
620 }
621
622 public ActiveMQDestination createTempQueue(String name) {
623 ActiveMQDestination rc = tempDestinations.get(name);
624 if( rc == null ) {
625 rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
626 sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
627 tempDestinations.put(name, rc);
628 }
629 return rc;
630 }
631
632 public ActiveMQDestination createTempTopic(String name) {
633 ActiveMQDestination rc = tempDestinations.get(name);
634 if( rc == null ) {
635 rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
636 sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
637 tempDestinations.put(name, rc);
638 tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
639 }
640 return rc;
641 }
642
643 public String getCreatedTempDestinationName(ActiveMQDestination destination) {
644 return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
645 }
646 }