001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.nio;
018
019 import java.io.EOFException;
020 import java.io.IOException;
021 import java.io.InterruptedIOException;
022 import java.io.OutputStream;
023 import java.nio.ByteBuffer;
024 import java.nio.channels.WritableByteChannel;
025
026 import org.apache.activemq.transport.tcp.TimeStampStream;
027
028 /**
029 * An optimized buffered outputstream for Tcp
030 *
031 *
032 */
033
034 public class NIOOutputStream extends OutputStream implements TimeStampStream {
035
036 private static final int BUFFER_SIZE = 8192;
037
038 private final WritableByteChannel out;
039 private final byte[] buffer;
040 private final ByteBuffer byteBuffer;
041
042 private int count;
043 private boolean closed;
044 private volatile long writeTimestamp = -1;//concurrent reads of this value
045
046 /**
047 * Constructor
048 *
049 * @param out
050 */
051 public NIOOutputStream(WritableByteChannel out) {
052 this(out, BUFFER_SIZE);
053 }
054
055 /**
056 * Creates a new buffered output stream to write data to the specified
057 * underlying output stream with the specified buffer size.
058 *
059 * @param out the underlying output stream.
060 * @param size the buffer size.
061 * @throws IllegalArgumentException if size <= 0.
062 */
063 public NIOOutputStream(WritableByteChannel out, int size) {
064 this.out = out;
065 if (size <= 0) {
066 throw new IllegalArgumentException("Buffer size <= 0");
067 }
068 buffer = new byte[size];
069 byteBuffer = ByteBuffer.wrap(buffer);
070 }
071
072 /**
073 * write a byte on to the stream
074 *
075 * @param b - byte to write
076 * @throws IOException
077 */
078 public void write(int b) throws IOException {
079 checkClosed();
080 if (availableBufferToWrite() < 1) {
081 flush();
082 }
083 buffer[count++] = (byte)b;
084 }
085
086 /**
087 * write a byte array to the stream
088 *
089 * @param b the byte buffer
090 * @param off the offset into the buffer
091 * @param len the length of data to write
092 * @throws IOException
093 */
094 public void write(byte b[], int off, int len) throws IOException {
095 checkClosed();
096 if (availableBufferToWrite() < len) {
097 flush();
098 }
099 if (buffer.length >= len) {
100 System.arraycopy(b, off, buffer, count, len);
101 count += len;
102 } else {
103 write(ByteBuffer.wrap(b, off, len));
104 }
105 }
106
107 /**
108 * flush the data to the output stream This doesn't call flush on the
109 * underlying outputstream, because Tcp is particularly efficent at doing
110 * this itself ....
111 *
112 * @throws IOException
113 */
114 public void flush() throws IOException {
115 if (count > 0 && out != null) {
116 byteBuffer.position(0);
117 byteBuffer.limit(count);
118 write(byteBuffer);
119 count = 0;
120 }
121 }
122
123 /**
124 * close this stream
125 *
126 * @throws IOException
127 */
128 public void close() throws IOException {
129 super.close();
130 closed = true;
131 }
132
133 /**
134 * Checks that the stream has not been closed
135 *
136 * @throws IOException
137 */
138 protected void checkClosed() throws IOException {
139 if (closed) {
140 throw new EOFException("Cannot write to the stream any more it has already been closed");
141 }
142 }
143
144 /**
145 * @return the amount free space in the buffer
146 */
147 private int availableBufferToWrite() {
148 return buffer.length - count;
149 }
150
151 protected void write(ByteBuffer data) throws IOException {
152 int remaining = data.remaining();
153 int lastRemaining = remaining - 1;
154 long delay = 1;
155 try {
156 writeTimestamp = System.currentTimeMillis();
157 while (remaining > 0) {
158
159 // We may need to do a little bit of sleeping to avoid a busy loop.
160 // Slow down if no data was written out..
161 if (remaining == lastRemaining) {
162 try {
163 // Use exponential rollback to increase sleep time.
164 Thread.sleep(delay);
165 delay *= 2;
166 if (delay > 1000) {
167 delay = 1000;
168 }
169 } catch (InterruptedException e) {
170 throw new InterruptedIOException();
171 }
172 } else {
173 delay = 1;
174 }
175 lastRemaining = remaining;
176
177 // Since the write is non-blocking, all the data may not have been
178 // written.
179 out.write(data);
180 remaining = data.remaining();
181 }
182 } finally {
183 writeTimestamp = -1;
184 }
185 }
186
187
188 /* (non-Javadoc)
189 * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting()
190 */
191 public boolean isWriting() {
192 return writeTimestamp > 0;
193 }
194
195 /* (non-Javadoc)
196 * @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp()
197 */
198 public long getWriteTimestamp() {
199 return writeTimestamp;
200 }
201
202 }