001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.nio;
018
019 import java.io.DataInputStream;
020 import java.io.DataOutputStream;
021 import java.io.EOFException;
022 import java.io.IOException;
023 import java.net.Socket;
024 import java.net.URI;
025 import java.net.UnknownHostException;
026 import java.nio.ByteBuffer;
027 import java.nio.channels.SelectionKey;
028 import java.nio.channels.SocketChannel;
029
030 import javax.net.SocketFactory;
031
032 import org.apache.activemq.command.Command;
033 import org.apache.activemq.transport.Transport;
034 import org.apache.activemq.transport.tcp.TcpTransport;
035 import org.apache.activemq.util.IOExceptionSupport;
036 import org.apache.activemq.util.ServiceStopper;
037 import org.apache.activemq.wireformat.WireFormat;
038
039 /**
040 * An implementation of the {@link Transport} interface using raw tcp/ip
041 *
042 *
043 */
044 public class NIOTransport extends TcpTransport {
045
046 // private static final Logger log = LoggerFactory.getLogger(NIOTransport.class);
047 private SocketChannel channel;
048 private SelectorSelection selection;
049 private ByteBuffer inputBuffer;
050 private ByteBuffer currentBuffer;
051 private int nextFrameSize;
052
053 public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
054 super(wireFormat, socketFactory, remoteLocation, localLocation);
055 }
056
057 public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
058 super(wireFormat, socket);
059 }
060
061 protected void initializeStreams() throws IOException {
062 channel = socket.getChannel();
063 channel.configureBlocking(false);
064
065 // listen for events telling us when the socket is readable.
066 selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
067 public void onSelect(SelectorSelection selection) {
068 serviceRead();
069 }
070
071 public void onError(SelectorSelection selection, Throwable error) {
072 if (error instanceof IOException) {
073 onException((IOException)error);
074 } else {
075 onException(IOExceptionSupport.create(error));
076 }
077 }
078 });
079
080 // Send the data via the channel
081 // inputBuffer = ByteBuffer.allocateDirect(8*1024);
082 inputBuffer = ByteBuffer.allocate(8 * 1024);
083 currentBuffer = inputBuffer;
084 nextFrameSize = -1;
085 currentBuffer.limit(4);
086 NIOOutputStream outPutStream = new NIOOutputStream(channel, 16 * 1024);
087 this.dataOut = new DataOutputStream(outPutStream);
088 this.buffOut = outPutStream;
089 }
090
091 private void serviceRead() {
092 try {
093 while (true) {
094
095 int readSize = channel.read(currentBuffer);
096 if (readSize == -1) {
097 onException(new EOFException());
098 selection.close();
099 break;
100 }
101 if (readSize == 0) {
102 break;
103 }
104
105 if (currentBuffer.hasRemaining()) {
106 continue;
107 }
108
109 // Are we trying to figure out the size of the next frame?
110 if (nextFrameSize == -1) {
111 assert inputBuffer == currentBuffer;
112
113 // If the frame is too big to fit in our direct byte buffer,
114 // Then allocate a non direct byte buffer of the right size
115 // for it.
116 inputBuffer.flip();
117 nextFrameSize = inputBuffer.getInt() + 4;
118 if (nextFrameSize > inputBuffer.capacity()) {
119 currentBuffer = ByteBuffer.allocate(nextFrameSize);
120 currentBuffer.putInt(nextFrameSize);
121 } else {
122 inputBuffer.limit(nextFrameSize);
123 }
124
125 } else {
126 currentBuffer.flip();
127
128 Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
129 doConsume((Command)command);
130
131 nextFrameSize = -1;
132 inputBuffer.clear();
133 inputBuffer.limit(4);
134 currentBuffer = inputBuffer;
135 }
136
137 }
138
139 } catch (IOException e) {
140 onException(e);
141 } catch (Throwable e) {
142 onException(IOExceptionSupport.create(e));
143 }
144 }
145
146 protected void doStart() throws Exception {
147 connect();
148 selection.setInterestOps(SelectionKey.OP_READ);
149 selection.enable();
150 }
151
152 protected void doStop(ServiceStopper stopper) throws Exception {
153 if (selection != null) {
154 selection.close();
155 }
156 super.doStop(stopper);
157 }
158 }