001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.vm;
018
019 import java.io.IOException;
020 import java.net.InetSocketAddress;
021 import java.net.URI;
022 import java.util.concurrent.atomic.AtomicInteger;
023
024 import org.apache.activemq.command.BrokerInfo;
025 import org.apache.activemq.transport.MutexTransport;
026 import org.apache.activemq.transport.ResponseCorrelator;
027 import org.apache.activemq.transport.Transport;
028 import org.apache.activemq.transport.TransportAcceptListener;
029 import org.apache.activemq.transport.TransportServer;
030
031 /**
032 * Broker side of the VMTransport
033 */
034 public class VMTransportServer implements TransportServer {
035
036 private TransportAcceptListener acceptListener;
037 private final URI location;
038 private boolean disposed;
039
040 private final AtomicInteger connectionCount = new AtomicInteger(0);
041 private final boolean disposeOnDisconnect;
042
043 /**
044 * @param location
045 * @param disposeOnDisconnect
046 */
047 public VMTransportServer(URI location, boolean disposeOnDisconnect) {
048 this.location = location;
049 this.disposeOnDisconnect = disposeOnDisconnect;
050 }
051
052 /**
053 * @return a pretty print of this
054 */
055 public String toString() {
056 return "VMTransportServer(" + location + ")";
057 }
058
059 /**
060 * @return new VMTransport
061 * @throws IOException
062 */
063 public VMTransport connect() throws IOException {
064 TransportAcceptListener al;
065 synchronized (this) {
066 if (disposed) {
067 throw new IOException("Server has been disposed.");
068 }
069 al = acceptListener;
070 }
071 if (al == null) {
072 throw new IOException("Server TransportAcceptListener is null.");
073 }
074
075 connectionCount.incrementAndGet();
076 VMTransport client = new VMTransport(location) {
077 public void stop() throws Exception {
078 if (stopping.compareAndSet(false, true) && !disposed) {
079 super.stop();
080 if (connectionCount.decrementAndGet() == 0
081 && disposeOnDisconnect) {
082 VMTransportServer.this.stop();
083 }
084 }
085 };
086 };
087
088 VMTransport server = new VMTransport(location);
089 client.setPeer(server);
090 server.setPeer(client);
091 al.onAccept(configure(server));
092 return client;
093 }
094
095 /**
096 * Configure transport
097 *
098 * @param transport
099 * @return the Transport
100 */
101 public static Transport configure(Transport transport) {
102 transport = new MutexTransport(transport);
103 transport = new ResponseCorrelator(transport);
104 return transport;
105 }
106
107 /**
108 * Set the Transport accept listener for new Connections
109 *
110 * @param acceptListener
111 */
112 public synchronized void setAcceptListener(TransportAcceptListener acceptListener) {
113 this.acceptListener = acceptListener;
114 }
115
116 public void start() throws IOException {
117 }
118
119 public void stop() throws IOException {
120 VMTransportFactory.stopped(this);
121 }
122
123 public URI getConnectURI() {
124 return location;
125 }
126
127 public URI getBindURI() {
128 return location;
129 }
130
131 public void setBrokerInfo(BrokerInfo brokerInfo) {
132 }
133
134 public InetSocketAddress getSocketAddress() {
135 return null;
136 }
137
138 public int getConnectionCount() {
139 return connectionCount.intValue();
140 }
141 }