001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.openwire;
018
019 import java.io.DataInput;
020 import java.io.DataOutput;
021 import java.io.IOException;
022 import java.lang.reflect.Method;
023 import java.util.HashMap;
024 import java.util.Map;
025
026 import org.apache.activemq.command.CommandTypes;
027 import org.apache.activemq.command.DataStructure;
028 import org.apache.activemq.command.WireFormatInfo;
029 import org.apache.activemq.util.ByteSequence;
030 import org.apache.activemq.util.ByteSequenceData;
031 import org.apache.activemq.util.DataByteArrayInputStream;
032 import org.apache.activemq.util.DataByteArrayOutputStream;
033 import org.apache.activemq.wireformat.WireFormat;
034
035 /**
036 *
037 *
038 */
039 public final class OpenWireFormat implements WireFormat {
040
041 public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_STORE_VERSION;
042
043 static final byte NULL_TYPE = CommandTypes.NULL;
044 private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
045 private static final int MARSHAL_CACHE_FREE_SPACE = 100;
046
047 private DataStreamMarshaller dataMarshallers[];
048 private int version;
049 private boolean stackTraceEnabled;
050 private boolean tcpNoDelayEnabled;
051 private boolean cacheEnabled;
052 private boolean tightEncodingEnabled;
053 private boolean sizePrefixDisabled;
054
055 // The following fields are used for value caching
056 private short nextMarshallCacheIndex;
057 private short nextMarshallCacheEvictionIndex;
058 private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>();
059 private DataStructure marshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE];
060 private DataStructure unmarshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE];
061 private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
062 private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
063 private WireFormatInfo preferedWireFormatInfo;
064
065 public OpenWireFormat() {
066 this(DEFAULT_VERSION);
067 }
068
069 public OpenWireFormat(int i) {
070 setVersion(i);
071 }
072
073 public int hashCode() {
074 return version ^ (cacheEnabled ? 0x10000000 : 0x20000000)
075 ^ (stackTraceEnabled ? 0x01000000 : 0x02000000)
076 ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
077 ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000);
078 }
079
080 public OpenWireFormat copy() {
081 OpenWireFormat answer = new OpenWireFormat();
082 answer.version = version;
083 answer.stackTraceEnabled = stackTraceEnabled;
084 answer.tcpNoDelayEnabled = tcpNoDelayEnabled;
085 answer.cacheEnabled = cacheEnabled;
086 answer.tightEncodingEnabled = tightEncodingEnabled;
087 answer.sizePrefixDisabled = sizePrefixDisabled;
088 answer.preferedWireFormatInfo = preferedWireFormatInfo;
089 return answer;
090 }
091
092 public boolean equals(Object object) {
093 if (object == null) {
094 return false;
095 }
096 OpenWireFormat o = (OpenWireFormat)object;
097 return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled
098 && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
099 && o.sizePrefixDisabled == sizePrefixDisabled;
100 }
101
102
103 public String toString() {
104 return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
105 + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + "}";
106 // return "OpenWireFormat{id="+id+",
107 // tightEncodingEnabled="+tightEncodingEnabled+"}";
108 }
109
110 public int getVersion() {
111 return version;
112 }
113
114 public synchronized ByteSequence marshal(Object command) throws IOException {
115
116 if (cacheEnabled) {
117 runMarshallCacheEvictionSweep();
118 }
119
120 // MarshallAware ma = null;
121 // // If not using value caching, then the marshaled form is always the
122 // // same
123 // if (!cacheEnabled && ((DataStructure)command).isMarshallAware()) {
124 // ma = (MarshallAware)command;
125 // }
126
127 ByteSequence sequence = null;
128 // if( ma!=null ) {
129 // sequence = ma.getCachedMarshalledForm(this);
130 // }
131
132 if (sequence == null) {
133
134 int size = 1;
135 if (command != null) {
136
137 DataStructure c = (DataStructure)command;
138 byte type = c.getDataStructureType();
139 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
140 if (dsm == null) {
141 throw new IOException("Unknown data type: " + type);
142 }
143 if (tightEncodingEnabled) {
144
145 BooleanStream bs = new BooleanStream();
146 size += dsm.tightMarshal1(this, c, bs);
147 size += bs.marshalledSize();
148
149 bytesOut.restart(size);
150 if (!sizePrefixDisabled) {
151 bytesOut.writeInt(size);
152 }
153 bytesOut.writeByte(type);
154 bs.marshal(bytesOut);
155 dsm.tightMarshal2(this, c, bytesOut, bs);
156 sequence = bytesOut.toByteSequence();
157
158 } else {
159 bytesOut.restart();
160 if (!sizePrefixDisabled) {
161 bytesOut.writeInt(0); // we don't know the final size
162 // yet but write this here for
163 // now.
164 }
165 bytesOut.writeByte(type);
166 dsm.looseMarshal(this, c, bytesOut);
167 sequence = bytesOut.toByteSequence();
168
169 if (!sizePrefixDisabled) {
170 size = sequence.getLength() - 4;
171 int pos = sequence.offset;
172 ByteSequenceData.writeIntBig(sequence, size);
173 sequence.offset = pos;
174 }
175 }
176
177 } else {
178 bytesOut.restart(5);
179 bytesOut.writeInt(size);
180 bytesOut.writeByte(NULL_TYPE);
181 sequence = bytesOut.toByteSequence();
182 }
183
184 // if( ma!=null ) {
185 // ma.setCachedMarshalledForm(this, sequence);
186 // }
187 }
188 return sequence;
189 }
190
191 public synchronized Object unmarshal(ByteSequence sequence) throws IOException {
192 bytesIn.restart(sequence);
193 // DataInputStream dis = new DataInputStream(new
194 // ByteArrayInputStream(sequence));
195
196 if (!sizePrefixDisabled) {
197 int size = bytesIn.readInt();
198 if (sequence.getLength() - 4 != size) {
199 // throw new IOException("Packet size does not match marshaled
200 // size");
201 }
202 }
203
204 Object command = doUnmarshal(bytesIn);
205 // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
206 // ((MarshallAware) command).setCachedMarshalledForm(this, sequence);
207 // }
208 return command;
209 }
210
211 public synchronized void marshal(Object o, DataOutput dataOut) throws IOException {
212
213 if (cacheEnabled) {
214 runMarshallCacheEvictionSweep();
215 }
216
217 int size = 1;
218 if (o != null) {
219
220 DataStructure c = (DataStructure)o;
221 byte type = c.getDataStructureType();
222 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
223 if (dsm == null) {
224 throw new IOException("Unknown data type: " + type);
225 }
226 if (tightEncodingEnabled) {
227 BooleanStream bs = new BooleanStream();
228 size += dsm.tightMarshal1(this, c, bs);
229 size += bs.marshalledSize();
230
231 if (!sizePrefixDisabled) {
232 dataOut.writeInt(size);
233 }
234
235 dataOut.writeByte(type);
236 bs.marshal(dataOut);
237 dsm.tightMarshal2(this, c, dataOut, bs);
238
239 } else {
240 DataOutput looseOut = dataOut;
241
242 if (!sizePrefixDisabled) {
243 bytesOut.restart();
244 looseOut = bytesOut;
245 }
246
247 looseOut.writeByte(type);
248 dsm.looseMarshal(this, c, looseOut);
249
250 if (!sizePrefixDisabled) {
251 ByteSequence sequence = bytesOut.toByteSequence();
252 dataOut.writeInt(sequence.getLength());
253 dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
254 }
255
256 }
257
258 } else {
259 if (!sizePrefixDisabled) {
260 dataOut.writeInt(size);
261 }
262 dataOut.writeByte(NULL_TYPE);
263 }
264 }
265
266 public Object unmarshal(DataInput dis) throws IOException {
267 DataInput dataIn = dis;
268 if (!sizePrefixDisabled) {
269 dis.readInt();
270 // int size = dis.readInt();
271 // byte[] data = new byte[size];
272 // dis.readFully(data);
273 // bytesIn.restart(data);
274 // dataIn = bytesIn;
275 }
276 return doUnmarshal(dataIn);
277 }
278
279 /**
280 * Used by NIO or AIO transports
281 */
282 public int tightMarshal1(Object o, BooleanStream bs) throws IOException {
283 int size = 1;
284 if (o != null) {
285 DataStructure c = (DataStructure)o;
286 byte type = c.getDataStructureType();
287 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
288 if (dsm == null) {
289 throw new IOException("Unknown data type: " + type);
290 }
291
292 size += dsm.tightMarshal1(this, c, bs);
293 size += bs.marshalledSize();
294 }
295 return size;
296 }
297
298 /**
299 * Used by NIO or AIO transports; note that the size is not written as part
300 * of this method.
301 */
302 public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException {
303 if (cacheEnabled) {
304 runMarshallCacheEvictionSweep();
305 }
306
307 if (o != null) {
308 DataStructure c = (DataStructure)o;
309 byte type = c.getDataStructureType();
310 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
311 if (dsm == null) {
312 throw new IOException("Unknown data type: " + type);
313 }
314 ds.writeByte(type);
315 bs.marshal(ds);
316 dsm.tightMarshal2(this, c, ds, bs);
317 }
318 }
319
320 /**
321 * Allows you to dynamically switch the version of the openwire protocol
322 * being used.
323 *
324 * @param version
325 */
326 public void setVersion(int version) {
327 String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory";
328 Class mfClass;
329 try {
330 mfClass = Class.forName(mfName, false, getClass().getClassLoader());
331 } catch (ClassNotFoundException e) {
332 throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version
333 + ", could not load " + mfName)
334 .initCause(e);
335 }
336 try {
337 Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class});
338 dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this});
339 } catch (Throwable e) {
340 throw (IllegalArgumentException)new IllegalArgumentException(
341 "Invalid version: "
342 + version
343 + ", "
344 + mfName
345 + " does not properly implement the createMarshallerMap method.")
346 .initCause(e);
347 }
348 this.version = version;
349 }
350
351 public Object doUnmarshal(DataInput dis) throws IOException {
352 byte dataType = dis.readByte();
353 if (dataType != NULL_TYPE) {
354 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
355 if (dsm == null) {
356 throw new IOException("Unknown data type: " + dataType);
357 }
358 Object data = dsm.createObject();
359 if (this.tightEncodingEnabled) {
360 BooleanStream bs = new BooleanStream();
361 bs.unmarshal(dis);
362 dsm.tightUnmarshal(this, data, dis, bs);
363 } else {
364 dsm.looseUnmarshal(this, data, dis);
365 }
366 return data;
367 } else {
368 return null;
369 }
370 }
371
372 // public void debug(String msg) {
373 // String t = (Thread.currentThread().getName()+" ").substring(0, 40);
374 // System.out.println(t+": "+msg);
375 // }
376 public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException {
377 bs.writeBoolean(o != null);
378 if (o == null) {
379 return 0;
380 }
381
382 if (o.isMarshallAware()) {
383 // MarshallAware ma = (MarshallAware)o;
384 ByteSequence sequence = null;
385 // sequence=ma.getCachedMarshalledForm(this);
386 bs.writeBoolean(sequence != null);
387 if (sequence != null) {
388 return 1 + sequence.getLength();
389 }
390 }
391
392 byte type = o.getDataStructureType();
393 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
394 if (dsm == null) {
395 throw new IOException("Unknown data type: " + type);
396 }
397 return 1 + dsm.tightMarshal1(this, o, bs);
398 }
399
400 public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs)
401 throws IOException {
402 if (!bs.readBoolean()) {
403 return;
404 }
405
406 byte type = o.getDataStructureType();
407 ds.writeByte(type);
408
409 if (o.isMarshallAware() && bs.readBoolean()) {
410
411 // We should not be doing any caching
412 throw new IOException("Corrupted stream");
413 // MarshallAware ma = (MarshallAware) o;
414 // ByteSequence sequence=ma.getCachedMarshalledForm(this);
415 // ds.write(sequence.getData(), sequence.getOffset(),
416 // sequence.getLength());
417
418 } else {
419
420 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
421 if (dsm == null) {
422 throw new IOException("Unknown data type: " + type);
423 }
424 dsm.tightMarshal2(this, o, ds, bs);
425
426 }
427 }
428
429 public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException {
430 if (bs.readBoolean()) {
431
432 byte dataType = dis.readByte();
433 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
434 if (dsm == null) {
435 throw new IOException("Unknown data type: " + dataType);
436 }
437 DataStructure data = dsm.createObject();
438
439 if (data.isMarshallAware() && bs.readBoolean()) {
440
441 dis.readInt();
442 dis.readByte();
443
444 BooleanStream bs2 = new BooleanStream();
445 bs2.unmarshal(dis);
446 dsm.tightUnmarshal(this, data, dis, bs2);
447
448 // TODO: extract the sequence from the dis and associate it.
449 // MarshallAware ma = (MarshallAware)data
450 // ma.setCachedMarshalledForm(this, sequence);
451
452 } else {
453 dsm.tightUnmarshal(this, data, dis, bs);
454 }
455
456 return data;
457 } else {
458 return null;
459 }
460 }
461
462 public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException {
463 if (dis.readBoolean()) {
464
465 byte dataType = dis.readByte();
466 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
467 if (dsm == null) {
468 throw new IOException("Unknown data type: " + dataType);
469 }
470 DataStructure data = dsm.createObject();
471 dsm.looseUnmarshal(this, data, dis);
472 return data;
473
474 } else {
475 return null;
476 }
477 }
478
479 public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException {
480 dataOut.writeBoolean(o != null);
481 if (o != null) {
482 byte type = o.getDataStructureType();
483 dataOut.writeByte(type);
484 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
485 if (dsm == null) {
486 throw new IOException("Unknown data type: " + type);
487 }
488 dsm.looseMarshal(this, o, dataOut);
489 }
490 }
491
492 public void runMarshallCacheEvictionSweep() {
493 // Do we need to start evicting??
494 while (marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE) {
495
496 marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]);
497 marshallCache[nextMarshallCacheEvictionIndex] = null;
498
499 nextMarshallCacheEvictionIndex++;
500 if (nextMarshallCacheEvictionIndex >= marshallCache.length) {
501 nextMarshallCacheEvictionIndex = 0;
502 }
503
504 }
505 }
506
507 public Short getMarshallCacheIndex(DataStructure o) {
508 return marshallCacheMap.get(o);
509 }
510
511 public Short addToMarshallCache(DataStructure o) {
512 short i = nextMarshallCacheIndex++;
513 if (nextMarshallCacheIndex >= marshallCache.length) {
514 nextMarshallCacheIndex = 0;
515 }
516
517 // We can only cache that item if there is space left.
518 if (marshallCacheMap.size() < marshallCache.length) {
519 marshallCache[i] = o;
520 Short index = new Short(i);
521 marshallCacheMap.put(o, index);
522 return index;
523 } else {
524 // Use -1 to indicate that the value was not cached due to cache
525 // being full.
526 return new Short((short)-1);
527 }
528 }
529
530 public void setInUnmarshallCache(short index, DataStructure o) {
531
532 // There was no space left in the cache, so we can't
533 // put this in the cache.
534 if (index == -1) {
535 return;
536 }
537
538 unmarshallCache[index] = o;
539 }
540
541 public DataStructure getFromUnmarshallCache(short index) {
542 return unmarshallCache[index];
543 }
544
545 public void setStackTraceEnabled(boolean b) {
546 stackTraceEnabled = b;
547 }
548
549 public boolean isStackTraceEnabled() {
550 return stackTraceEnabled;
551 }
552
553 public boolean isTcpNoDelayEnabled() {
554 return tcpNoDelayEnabled;
555 }
556
557 public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) {
558 this.tcpNoDelayEnabled = tcpNoDelayEnabled;
559 }
560
561 public boolean isCacheEnabled() {
562 return cacheEnabled;
563 }
564
565 public void setCacheEnabled(boolean cacheEnabled) {
566 this.cacheEnabled = cacheEnabled;
567 }
568
569 public boolean isTightEncodingEnabled() {
570 return tightEncodingEnabled;
571 }
572
573 public void setTightEncodingEnabled(boolean tightEncodingEnabled) {
574 this.tightEncodingEnabled = tightEncodingEnabled;
575 }
576
577 public boolean isSizePrefixDisabled() {
578 return sizePrefixDisabled;
579 }
580
581 public void setSizePrefixDisabled(boolean prefixPacketSize) {
582 this.sizePrefixDisabled = prefixPacketSize;
583 }
584
585 public void setPreferedWireFormatInfo(WireFormatInfo info) {
586 this.preferedWireFormatInfo = info;
587 }
588
589 public WireFormatInfo getPreferedWireFormatInfo() {
590 return preferedWireFormatInfo;
591 }
592
593 public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
594
595 if (preferedWireFormatInfo == null) {
596 throw new IllegalStateException("Wireformat cannot not be renegotiated.");
597 }
598
599 this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()));
600 info.setVersion(this.getVersion());
601
602 this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
603 info.setStackTraceEnabled(this.stackTraceEnabled);
604
605 this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled();
606 info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled);
607
608 this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
609 info.setCacheEnabled(this.cacheEnabled);
610
611 this.tightEncodingEnabled = info.isTightEncodingEnabled()
612 && preferedWireFormatInfo.isTightEncodingEnabled();
613 info.setTightEncodingEnabled(this.tightEncodingEnabled);
614
615 this.sizePrefixDisabled = info.isSizePrefixDisabled()
616 && preferedWireFormatInfo.isSizePrefixDisabled();
617 info.setSizePrefixDisabled(this.sizePrefixDisabled);
618
619 if (cacheEnabled) {
620
621 int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize());
622 info.setCacheSize(size);
623
624 if (size == 0) {
625 size = MARSHAL_CACHE_SIZE;
626 }
627
628 marshallCache = new DataStructure[size];
629 unmarshallCache = new DataStructure[size];
630 nextMarshallCacheIndex = 0;
631 nextMarshallCacheEvictionIndex = 0;
632 marshallCacheMap = new HashMap<DataStructure, Short>();
633 } else {
634 marshallCache = null;
635 unmarshallCache = null;
636 nextMarshallCacheIndex = 0;
637 nextMarshallCacheEvictionIndex = 0;
638 marshallCacheMap = null;
639 }
640
641 }
642
643 protected int min(int version1, int version2) {
644 if (version1 < version2 && version1 > 0 || version2 <= 0) {
645 return version1;
646 }
647 return version2;
648 }
649 }