001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq.transport.stomp;
019
020 import java.io.ByteArrayOutputStream;
021 import java.io.DataInputStream;
022 import java.io.IOException;
023 import java.io.InputStream;
024 import java.io.OutputStream;
025 import java.net.Socket;
026 import java.net.UnknownHostException;
027 import java.util.HashMap;
028
029 public class StompConnection {
030
031 public static final long RECEIVE_TIMEOUT = 10000;
032
033 private Socket stompSocket;
034 private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
035
036 public void open(String host, int port) throws IOException, UnknownHostException {
037 open(new Socket(host, port));
038 }
039
040 public void open(Socket socket) {
041 stompSocket = socket;
042 }
043
044 public void close() throws IOException {
045 if (stompSocket != null) {
046 stompSocket.close();
047 stompSocket = null;
048 }
049 }
050
051 public void sendFrame(String data) throws Exception {
052 byte[] bytes = data.getBytes("UTF-8");
053 OutputStream outputStream = stompSocket.getOutputStream();
054 outputStream.write(bytes);
055 outputStream.write(0);
056 outputStream.flush();
057 }
058
059 public void sendFrame(String frame, byte[] data) throws Exception {
060 byte[] bytes = frame.getBytes("UTF-8");
061 OutputStream outputStream = stompSocket.getOutputStream();
062 outputStream.write(bytes);
063 outputStream.write(data);
064 outputStream.write(0);
065 outputStream.flush();
066 }
067
068 public StompFrame receive() throws Exception {
069 return receive(RECEIVE_TIMEOUT);
070 }
071
072 public StompFrame receive(long timeOut) throws Exception {
073 stompSocket.setSoTimeout((int)timeOut);
074 InputStream is = stompSocket.getInputStream();
075 StompWireFormat wf = new StompWireFormat();
076 DataInputStream dis = new DataInputStream(is);
077 return (StompFrame)wf.unmarshal(dis);
078 }
079
080 public String receiveFrame() throws Exception {
081 return receiveFrame(RECEIVE_TIMEOUT);
082 }
083
084 public String receiveFrame(long timeOut) throws Exception {
085 stompSocket.setSoTimeout((int)timeOut);
086 InputStream is = stompSocket.getInputStream();
087 int c = 0;
088 for (;;) {
089 c = is.read();
090 if (c < 0) {
091 throw new IOException("socket closed.");
092 } else if (c == 0) {
093 c = is.read();
094 if (c == '\n') {
095 // end of frame
096 return stringFromBuffer(inputBuffer);
097 } else {
098 inputBuffer.write(0);
099 inputBuffer.write(c);
100 }
101 } else {
102 inputBuffer.write(c);
103 }
104 }
105 }
106
107 private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
108 byte[] ba = inputBuffer.toByteArray();
109 inputBuffer.reset();
110 return new String(ba, "UTF-8");
111 }
112
113 public Socket getStompSocket() {
114 return stompSocket;
115 }
116
117 public void setStompSocket(Socket stompSocket) {
118 this.stompSocket = stompSocket;
119 }
120
121 public void connect(String username, String password) throws Exception {
122 connect(username, password, null);
123 }
124
125 public void connect(String username, String password, String client) throws Exception {
126 HashMap<String, String> headers = new HashMap();
127 headers.put("login", username);
128 headers.put("passcode", password);
129 if (client != null) {
130 headers.put("client-id", client);
131 }
132 StompFrame frame = new StompFrame("CONNECT", headers);
133 sendFrame(frame.format());
134
135 StompFrame connect = receive();
136 if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
137 throw new Exception ("Not connected: " + connect.getBody());
138 }
139 }
140
141 public void disconnect() throws Exception {
142 StompFrame frame = new StompFrame("DISCONNECT");
143 sendFrame(frame.format());
144 }
145
146 public void send(String destination, String message) throws Exception {
147 send(destination, message, null, null);
148 }
149
150 public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception {
151 if (headers == null) {
152 headers = new HashMap<String, String>();
153 }
154 headers.put("destination", destination);
155 if (transaction != null) {
156 headers.put("transaction", transaction);
157 }
158 StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
159 sendFrame(frame.format());
160 }
161
162 public void subscribe(String destination) throws Exception {
163 subscribe(destination, null, null);
164 }
165
166 public void subscribe(String destination, String ack) throws Exception {
167 subscribe(destination, ack, new HashMap<String, String>());
168 }
169
170 public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception {
171 if (headers == null) {
172 headers = new HashMap<String, String>();
173 }
174 headers.put("destination", destination);
175 if (ack != null) {
176 headers.put("ack", ack);
177 }
178 StompFrame frame = new StompFrame("SUBSCRIBE", headers);
179 sendFrame(frame.format());
180 }
181
182 public void unsubscribe(String destination) throws Exception {
183 unsubscribe(destination, null);
184 }
185
186 public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception {
187 if (headers == null) {
188 headers = new HashMap<String, String>();
189 }
190 headers.put("destination", destination);
191 StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
192 sendFrame(frame.format());
193 }
194
195 public void begin(String transaction) throws Exception {
196 HashMap<String, String> headers = new HashMap<String, String>();
197 headers.put("transaction", transaction);
198 StompFrame frame = new StompFrame("BEGIN", headers);
199 sendFrame(frame.format());
200 }
201
202 public void abort(String transaction) throws Exception {
203 HashMap<String, String> headers = new HashMap<String, String>();
204 headers.put("transaction", transaction);
205 StompFrame frame = new StompFrame("ABORT", headers);
206 sendFrame(frame.format());
207 }
208
209 public void commit(String transaction) throws Exception {
210 HashMap<String, String> headers = new HashMap<String, String>();
211 headers.put("transaction", transaction);
212 StompFrame frame = new StompFrame("COMMIT", headers);
213 sendFrame(frame.format());
214 }
215
216 public void ack(StompFrame frame) throws Exception {
217 ack(frame.getHeaders().get("message-id"), null);
218 }
219
220 public void ack(StompFrame frame, String transaction) throws Exception {
221 ack(frame.getHeaders().get("message-id"), transaction);
222 }
223
224 public void ack(String messageId) throws Exception {
225 ack(messageId, null);
226 }
227
228 public void ack(String messageId, String transaction) throws Exception {
229 HashMap<String, String> headers = new HashMap<String, String>();
230 headers.put("message-id", messageId);
231 if (transaction != null)
232 headers.put("transaction", transaction);
233 StompFrame frame = new StompFrame("ACK", headers);
234 sendFrame(frame.format());
235 }
236
237 protected String appendHeaders(HashMap<String, Object> headers) {
238 StringBuffer result = new StringBuffer();
239 for (String key : headers.keySet()) {
240 result.append(key + ":" + headers.get(key) + "\n");
241 }
242 result.append("\n");
243 return result.toString();
244 }
245
246 }