001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.IOException;
020 import java.io.OutputStream;
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import java.util.Map;
024
025 import javax.jms.InvalidDestinationException;
026 import javax.jms.JMSException;
027
028 import org.apache.activemq.command.ActiveMQBytesMessage;
029 import org.apache.activemq.command.ActiveMQDestination;
030 import org.apache.activemq.command.ActiveMQMessage;
031 import org.apache.activemq.command.MessageId;
032 import org.apache.activemq.command.ProducerId;
033 import org.apache.activemq.command.ProducerInfo;
034 import org.apache.activemq.util.IOExceptionSupport;
035
036 /**
037 *
038 */
039 public class ActiveMQOutputStream extends OutputStream implements Disposable {
040
041 protected int count;
042
043 final byte buffer[];
044
045 private final ActiveMQConnection connection;
046 private final Map<String, Object> properties;
047 private final ProducerInfo info;
048
049 private long messageSequence;
050 private boolean closed;
051 private final int deliveryMode;
052 private final int priority;
053 private final long timeToLive;
054
055 /**
056 * JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb
057 */
058 public final static String AMQ_STREAM_CHUNK_SIZE = "AMQ_STREAM_CHUNK_SIZE";
059
060 public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority,
061 long timeToLive) throws JMSException {
062 this.connection = connection;
063 this.deliveryMode = deliveryMode;
064 this.priority = priority;
065 this.timeToLive = timeToLive;
066 this.properties = properties == null ? null : new HashMap<String, Object>(properties);
067
068 Integer chunkSize = this.properties == null ? null : (Integer) this.properties.get(AMQ_STREAM_CHUNK_SIZE);
069 if (chunkSize == null) {
070 chunkSize = 64 * 1024;
071 } else {
072 if (chunkSize < 1) {
073 throw new IllegalArgumentException("Chunk size must be greater then 0");
074 } else {
075 chunkSize *= 1024;
076 }
077 }
078
079 buffer = new byte[chunkSize];
080
081 if (destination == null) {
082 throw new InvalidDestinationException("Don't understand null destinations");
083 }
084
085 this.info = new ProducerInfo(producerId);
086 this.info.setDestination(destination);
087
088 this.connection.addOutputStream(this);
089 this.connection.asyncSendPacket(info);
090 }
091
092 public void close() throws IOException {
093 if (!closed) {
094 flushBuffer();
095 try {
096 // Send an EOS style empty message to signal EOS.
097 send(new ActiveMQMessage(), true);
098 dispose();
099 this.connection.asyncSendPacket(info.createRemoveCommand());
100 } catch (JMSException e) {
101 IOExceptionSupport.create(e);
102 }
103 }
104 }
105
106 public void dispose() {
107 if (!closed) {
108 this.connection.removeOutputStream(this);
109 closed = true;
110 }
111 }
112
113 public synchronized void write(int b) throws IOException {
114 buffer[count++] = (byte) b;
115 if (count == buffer.length) {
116 flushBuffer();
117 }
118 }
119
120 public synchronized void write(byte b[], int off, int len) throws IOException {
121 while (len > 0) {
122 int max = Math.min(len, buffer.length - count);
123 System.arraycopy(b, off, buffer, count, max);
124
125 len -= max;
126 count += max;
127 off += max;
128
129 if (count == buffer.length) {
130 flushBuffer();
131 }
132 }
133 }
134
135 public synchronized void flush() throws IOException {
136 flushBuffer();
137 }
138
139 private void flushBuffer() throws IOException {
140 if (count != 0) {
141 try {
142 ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
143 msg.writeBytes(buffer, 0, count);
144 send(msg, false);
145 } catch (JMSException e) {
146 throw IOExceptionSupport.create(e);
147 }
148 count = 0;
149 }
150 }
151
152 /**
153 * @param msg
154 * @throws JMSException
155 */
156 private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
157 if (properties != null) {
158 for (Iterator iter = properties.keySet().iterator(); iter.hasNext();) {
159 String key = (String) iter.next();
160 Object value = properties.get(key);
161 msg.setObjectProperty(key, value);
162 }
163 }
164 msg.setType("org.apache.activemq.Stream");
165 msg.setGroupID(info.getProducerId().toString());
166 if (eosMessage) {
167 msg.setGroupSequence(-1);
168 } else {
169 msg.setGroupSequence((int) messageSequence);
170 }
171 MessageId id = new MessageId(info.getProducerId(), messageSequence++);
172 connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage);
173 }
174
175 public String toString() {
176 return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
177 }
178
179 }