001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.stomp;
018
019 import java.io.IOException;
020 import java.io.Serializable;
021 import java.io.StringReader;
022 import java.io.StringWriter;
023 import java.util.HashMap;
024 import java.util.Map;
025
026 import javax.jms.JMSException;
027
028 import org.apache.activemq.advisory.AdvisorySupport;
029 import org.apache.activemq.broker.BrokerContext;
030 import org.apache.activemq.broker.BrokerContextAware;
031 import org.apache.activemq.command.ActiveMQMapMessage;
032 import org.apache.activemq.command.ActiveMQMessage;
033 import org.apache.activemq.command.ActiveMQObjectMessage;
034 import org.apache.activemq.command.DataStructure;
035 import org.apache.activemq.util.JettisonMappedXmlDriver;
036 import org.codehaus.jettison.mapped.Configuration;
037 import org.springframework.beans.BeansException;
038 import org.springframework.context.ApplicationContext;
039
040 import com.thoughtworks.xstream.XStream;
041 import com.thoughtworks.xstream.io.HierarchicalStreamReader;
042 import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
043 import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
044 import com.thoughtworks.xstream.io.xml.XppReader;
045
046 /**
047 * Frame translator implementation that uses XStream to convert messages to and
048 * from XML and JSON
049 *
050 * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
051 */
052 public class JmsFrameTranslator extends LegacyFrameTranslator implements
053 BrokerContextAware {
054
055 XStream xStream = null;
056 BrokerContext brokerContext;
057
058 public ActiveMQMessage convertFrame(ProtocolConverter converter,
059 StompFrame command) throws JMSException, ProtocolException {
060 Map headers = command.getHeaders();
061 ActiveMQMessage msg;
062 String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
063 if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
064 msg = super.convertFrame(converter, command);
065 } else {
066 HierarchicalStreamReader in;
067
068 try {
069 String text = new String(command.getContent(), "UTF-8");
070 switch (Stomp.Transformations.getValue(transformation)) {
071 case JMS_OBJECT_XML:
072 in = new XppReader(new StringReader(text));
073 msg = createObjectMessage(in);
074 break;
075 case JMS_OBJECT_JSON:
076 in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
077 msg = createObjectMessage(in);
078 break;
079 case JMS_MAP_XML:
080 in = new XppReader(new StringReader(text));
081 msg = createMapMessage(in);
082 break;
083 case JMS_MAP_JSON:
084 in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
085 msg = createMapMessage(in);
086 break;
087 default:
088 throw new Exception("Unkown transformation: " + transformation);
089 }
090 } catch (Throwable e) {
091 command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
092 msg = super.convertFrame(converter, command);
093 }
094 }
095 FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
096 return msg;
097 }
098
099 public StompFrame convertMessage(ProtocolConverter converter,
100 ActiveMQMessage message) throws IOException, JMSException {
101 if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
102 StompFrame command = new StompFrame();
103 command.setAction(Stomp.Responses.MESSAGE);
104 Map<String, String> headers = new HashMap<String, String>(25);
105 command.setHeaders(headers);
106
107 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
108 converter, message, command, this);
109
110 if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
111 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString());
112 } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
113 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString());
114 }
115
116 ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
117 command.setContent(marshall(msg.getObject(),
118 headers.get(Stomp.Headers.TRANSFORMATION))
119 .getBytes("UTF-8"));
120 return command;
121
122 } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
123 StompFrame command = new StompFrame();
124 command.setAction(Stomp.Responses.MESSAGE);
125 Map<String, String> headers = new HashMap<String, String>(25);
126 command.setHeaders(headers);
127
128 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
129 converter, message, command, this);
130
131 if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
132 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString());
133 } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
134 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString());
135 }
136
137 ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
138 command.setContent(marshall((Serializable)msg.getContentMap(),
139 headers.get(Stomp.Headers.TRANSFORMATION))
140 .getBytes("UTF-8"));
141 return command;
142 } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
143 AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
144
145 StompFrame command = new StompFrame();
146 command.setAction(Stomp.Responses.MESSAGE);
147 Map<String, String> headers = new HashMap<String, String>(25);
148 command.setHeaders(headers);
149
150 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
151 converter, message, command, this);
152
153 if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
154 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString());
155 } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
156 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
157 }
158
159 String body = marshallAdvisory(message.getDataStructure(),
160 headers.get(Stomp.Headers.TRANSFORMATION));
161 command.setContent(body.getBytes("UTF-8"));
162 return command;
163 } else {
164 return super.convertMessage(converter, message);
165 }
166 }
167
168 /**
169 * Marshalls the Object to a string using XML or JSON encoding
170 */
171 protected String marshall(Serializable object, String transformation)
172 throws JMSException {
173 StringWriter buffer = new StringWriter();
174 HierarchicalStreamWriter out;
175 if (transformation.toLowerCase().endsWith("json")) {
176 out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer);
177 } else {
178 out = new PrettyPrintWriter(buffer);
179 }
180 getXStream().marshal(object, out);
181 return buffer.toString();
182 }
183
184 protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
185 ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
186 Object obj = getXStream().unmarshal(in);
187 objMsg.setObject((Serializable) obj);
188 return objMsg;
189 }
190
191 protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
192 ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
193 Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
194 for (String key : map.keySet()) {
195 mapMsg.setObject(key, map.get(key));
196 }
197 return mapMsg;
198 }
199
200 protected String marshallAdvisory(final DataStructure ds, String transformation) {
201
202 StringWriter buffer = new StringWriter();
203 HierarchicalStreamWriter out;
204 if (transformation.toLowerCase().endsWith("json")) {
205 out = new JettisonMappedXmlDriver().createWriter(buffer);
206 } else {
207 out = new PrettyPrintWriter(buffer);
208 }
209
210 XStream xstream = getXStream();
211 xstream.setMode(XStream.NO_REFERENCES);
212 xstream.aliasPackage("", "org.apache.activemq.command");
213 xstream.marshal(ds, out);
214 return buffer.toString();
215 }
216
217 // Properties
218 // -------------------------------------------------------------------------
219 public XStream getXStream() {
220 if (xStream == null) {
221 xStream = createXStream();
222 }
223 return xStream;
224 }
225
226 public void setXStream(XStream xStream) {
227 this.xStream = xStream;
228 }
229
230 // Implementation methods
231 // -------------------------------------------------------------------------
232 protected XStream createXStream() {
233 XStream xstream = null;
234 if (brokerContext != null) {
235 Map<String, XStream> beans = brokerContext.getBeansOfType(XStream.class);
236 for (XStream bean : beans.values()) {
237 if (bean != null) {
238 xstream = bean;
239 break;
240 }
241 }
242 }
243
244 if (xstream == null) {
245 xstream = new XStream();
246 }
247 return xstream;
248
249 }
250
251 public void setBrokerContext(BrokerContext brokerContext) {
252 this.brokerContext = brokerContext;
253 }
254
255 }