001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.stomp;
018
019 import java.io.ByteArrayInputStream;
020 import java.io.DataOutputStream;
021 import java.io.EOFException;
022 import java.io.IOException;
023 import java.net.Socket;
024 import java.net.URI;
025 import java.net.UnknownHostException;
026 import java.nio.ByteBuffer;
027 import java.nio.channels.SelectionKey;
028 import java.nio.channels.SocketChannel;
029 import java.util.HashMap;
030
031 import javax.net.SocketFactory;
032
033 import org.apache.activemq.command.Command;
034 import org.apache.activemq.transport.Transport;
035 import org.apache.activemq.transport.nio.NIOOutputStream;
036 import org.apache.activemq.transport.nio.SelectorManager;
037 import org.apache.activemq.transport.nio.SelectorSelection;
038 import org.apache.activemq.transport.tcp.TcpTransport;
039 import org.apache.activemq.util.ByteArrayOutputStream;
040 import org.apache.activemq.util.ByteSequence;
041 import org.apache.activemq.util.DataByteArrayInputStream;
042 import org.apache.activemq.util.IOExceptionSupport;
043 import org.apache.activemq.util.ServiceStopper;
044 import org.apache.activemq.wireformat.WireFormat;
045
046 /**
047 * An implementation of the {@link Transport} interface for using Stomp over NIO
048 *
049 *
050 */
051 public class StompNIOTransport extends TcpTransport {
052
053 private SocketChannel channel;
054 private SelectorSelection selection;
055
056 private ByteBuffer inputBuffer;
057 ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
058 boolean processedHeaders = false;
059 String action;
060 HashMap<String, String> headers;
061 int contentLength = -1;
062 int readLength = 0;
063 int previousByte = -1;
064
065 public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
066 super(wireFormat, socketFactory, remoteLocation, localLocation);
067 }
068
069 public StompNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
070 super(wireFormat, socket);
071 }
072
073 protected void initializeStreams() throws IOException {
074 channel = socket.getChannel();
075 channel.configureBlocking(false);
076
077 // listen for events telling us when the socket is readable.
078 selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
079 public void onSelect(SelectorSelection selection) {
080 serviceRead();
081 }
082
083 public void onError(SelectorSelection selection, Throwable error) {
084 if (error instanceof IOException) {
085 onException((IOException)error);
086 } else {
087 onException(IOExceptionSupport.create(error));
088 }
089 }
090 });
091
092 inputBuffer = ByteBuffer.allocate(8 * 1024);
093 NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
094 this.dataOut = new DataOutputStream(outPutStream);
095 this.buffOut = outPutStream;
096 }
097
098 private void serviceRead() {
099 try {
100
101 while (true) {
102 // read channel
103 int readSize = channel.read(inputBuffer);
104 // channel is closed, cleanup
105 if (readSize == -1) {
106 onException(new EOFException());
107 selection.close();
108 break;
109 }
110 // nothing more to read, break
111 if (readSize == 0) {
112 break;
113 }
114
115 inputBuffer.flip();
116
117 int b;
118 ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
119
120 int i = 0;
121 while(i++ < readSize) {
122 b = input.read();
123 // skip repeating nulls
124 if (!processedHeaders && previousByte == 0 && b == 0) {
125 continue;
126 }
127
128 if (!processedHeaders) {
129 currentCommand.write(b);
130 // end of headers section, parse action and header
131 if (previousByte == '\n' && b == '\n') {
132 if (wireFormat instanceof StompWireFormat) {
133 DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
134 action = ((StompWireFormat)wireFormat).parseAction(data);
135 headers = ((StompWireFormat)wireFormat).parseHeaders(data);
136 String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
137 if (contentLengthHeader != null) {
138 contentLength = ((StompWireFormat)wireFormat).parseContentLength(contentLengthHeader);
139 } else {
140 contentLength = -1;
141 }
142 }
143 processedHeaders = true;
144 currentCommand.reset();
145 }
146 } else {
147
148 if (contentLength == -1) {
149 // end of command reached, unmarshal
150 if (b == 0) {
151 processCommand();
152 } else {
153 currentCommand.write(b);
154 }
155 } else {
156 // read desired content length
157 if (readLength++ == contentLength) {
158 processCommand();
159 readLength = 0;
160 } else {
161 currentCommand.write(b);
162 }
163 }
164 }
165
166 previousByte = b;
167 }
168 // clear the buffer
169 inputBuffer.clear();
170
171 }
172 } catch (IOException e) {
173 onException(e);
174 } catch (Throwable e) {
175 onException(IOExceptionSupport.create(e));
176 }
177 }
178
179 private void processCommand() throws Exception {
180 StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
181 doConsume(frame);
182 processedHeaders = false;
183 currentCommand.reset();
184 contentLength = -1;
185 }
186
187 protected void doStart() throws Exception {
188 connect();
189 selection.setInterestOps(SelectionKey.OP_READ);
190 selection.enable();
191 }
192
193 protected void doStop(ServiceStopper stopper) throws Exception {
194 try {
195 selection.close();
196 } catch (Exception e) {
197 e.printStackTrace();
198 }
199 super.doStop(stopper);
200 }
201 }