001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.nio;
018
019 import java.io.IOException;
020 import java.nio.channels.SocketChannel;
021 import java.util.LinkedList;
022 import java.util.concurrent.Executor;
023 import java.util.concurrent.ExecutorService;
024 import java.util.concurrent.SynchronousQueue;
025 import java.util.concurrent.ThreadFactory;
026 import java.util.concurrent.ThreadPoolExecutor;
027 import java.util.concurrent.TimeUnit;
028
029 /**
030 * The SelectorManager will manage one Selector and the thread that checks the
031 * selector.
032 *
033 * We may need to consider running more than one thread to check the selector if
034 * servicing the selector takes too long.
035 *
036 * @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $
037 */
038 public final class SelectorManager {
039
040 public static final SelectorManager SINGLETON = new SelectorManager();
041
042 private Executor selectorExecutor = createDefaultExecutor();
043 private Executor channelExecutor = selectorExecutor;
044 private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
045 private int maxChannelsPerWorker = 1024;
046
047 protected ExecutorService createDefaultExecutor() {
048 ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
049 public Thread newThread(Runnable runnable) {
050 return new Thread(runnable, "ActiveMQ NIO Worker");
051 }
052 });
053 // rc.allowCoreThreadTimeOut(true);
054 return rc;
055 }
056
057 public static SelectorManager getInstance() {
058 return SINGLETON;
059 }
060
061 public interface Listener {
062 void onSelect(SelectorSelection selector);
063
064 void onError(SelectorSelection selection, Throwable error);
065 }
066
067
068 public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener)
069 throws IOException {
070
071 SelectorSelection selection = null;
072 while( selection == null ) {
073 if (freeWorkers.size() > 0) {
074 SelectorWorker worker = freeWorkers.getFirst();
075 if( worker.isReleased() ) {
076 freeWorkers.remove(worker);
077 } else {
078 worker.retain();
079 selection = new SelectorSelection(worker, socketChannel, listener);
080 }
081
082 } else {
083 // Worker starts /w retain count of 1
084 SelectorWorker worker = new SelectorWorker(this);
085 freeWorkers.addFirst(worker);
086 selection = new SelectorSelection(worker, socketChannel, listener);
087 }
088 }
089
090 return selection;
091 }
092
093 synchronized void onWorkerFullEvent(SelectorWorker worker) {
094 freeWorkers.remove(worker);
095 }
096
097 public synchronized void onWorkerEmptyEvent(SelectorWorker worker) {
098 freeWorkers.remove(worker);
099 }
100
101 public synchronized void onWorkerNotFullEvent(SelectorWorker worker) {
102 freeWorkers.addFirst(worker);
103 }
104
105 public Executor getChannelExecutor() {
106 return channelExecutor;
107 }
108
109 public void setChannelExecutor(Executor channelExecutor) {
110 this.channelExecutor = channelExecutor;
111 }
112
113 public int getMaxChannelsPerWorker() {
114 return maxChannelsPerWorker;
115 }
116
117 public void setMaxChannelsPerWorker(int maxChannelsPerWorker) {
118 this.maxChannelsPerWorker = maxChannelsPerWorker;
119 }
120
121 public Executor getSelectorExecutor() {
122 return selectorExecutor;
123 }
124
125 public void setSelectorExecutor(Executor selectorExecutor) {
126 this.selectorExecutor = selectorExecutor;
127 }
128
129 }