001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.thread;
018
019 import java.util.concurrent.Executor;
020 import java.util.concurrent.ExecutorService;
021 import java.util.concurrent.SynchronousQueue;
022 import java.util.concurrent.ThreadFactory;
023 import java.util.concurrent.ThreadPoolExecutor;
024 import java.util.concurrent.TimeUnit;
025 import java.util.concurrent.atomic.AtomicLong;
026
027 /**
028 * Manages the thread pool for long running tasks. Long running tasks are not
029 * always active but when they are active, they may need a few iterations of
030 * processing for them to become idle. The manager ensures that each task is
031 * processes but that no one task overtakes the system. This is kinda like
032 * cooperative multitasking.
033 *
034 *
035 */
036 public class TaskRunnerFactory implements Executor {
037
038 private ExecutorService executor;
039 private int maxIterationsPerRun;
040 private String name;
041 private int priority;
042 private boolean daemon;
043 private AtomicLong id = new AtomicLong(0);
044
045 public TaskRunnerFactory() {
046 this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
047 }
048
049 private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) {
050 this(name,priority,daemon,maxIterationsPerRun,false);
051 }
052
053
054 public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) {
055
056 this.name = name;
057 this.priority = priority;
058 this.daemon = daemon;
059 this.maxIterationsPerRun = maxIterationsPerRun;
060
061 // If your OS/JVM combination has a good thread model, you may want to
062 // avoid
063 // using a thread pool to run tasks and use a DedicatedTaskRunner
064 // instead.
065 if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) {
066 executor = null;
067 } else {
068 executor = createDefaultExecutor();
069 }
070 }
071
072 public void shutdown() {
073 if (executor != null) {
074 executor.shutdownNow();
075 }
076 }
077
078 public TaskRunner createTaskRunner(Task task, String name) {
079 if (executor != null) {
080 return new PooledTaskRunner(executor, task, maxIterationsPerRun);
081 } else {
082 return new DedicatedTaskRunner(task, name, priority, daemon);
083 }
084 }
085
086 public void execute(Runnable runnable) {
087 execute(runnable, "ActiveMQ Task");
088 }
089
090 public void execute(Runnable runnable, String name) {
091 if (executor != null) {
092 executor.execute(runnable);
093 } else {
094 new Thread(runnable, name + "-" + id.incrementAndGet()).start();
095 }
096 }
097
098 protected ExecutorService createDefaultExecutor() {
099 ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
100 public Thread newThread(Runnable runnable) {
101 Thread thread = new Thread(runnable, name + "-" + id.incrementAndGet());
102 thread.setDaemon(daemon);
103 thread.setPriority(priority);
104 return thread;
105 }
106 });
107 // rc.allowCoreThreadTimeOut(true);
108 return rc;
109 }
110
111 }