001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.thread;
018    
019    import java.util.concurrent.Executor;
020    import java.util.concurrent.ExecutorService;
021    import java.util.concurrent.SynchronousQueue;
022    import java.util.concurrent.ThreadFactory;
023    import java.util.concurrent.ThreadPoolExecutor;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.atomic.AtomicBoolean;
026    import java.util.concurrent.atomic.AtomicLong;
027    
028    /**
029     * Manages the thread pool for long running tasks. Long running tasks are not
030     * always active but when they are active, they may need a few iterations of
031     * processing for them to become idle. The manager ensures that each task is
032     * processes but that no one task overtakes the system. This is kinda like
033     * cooperative multitasking.
034     *
035     * @org.apache.xbean.XBean
036     */
037    public class TaskRunnerFactory implements Executor {
038    
039        private ExecutorService executor;
040        private int maxIterationsPerRun;
041        private String name;
042        private int priority;
043        private boolean daemon;
044        private AtomicLong id = new AtomicLong(0);
045        private boolean dedicatedTaskRunner;
046        private AtomicBoolean initDone = new AtomicBoolean(false);
047    
048        public TaskRunnerFactory() {
049            this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
050        }
051    
052        private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) {
053            this(name,priority,daemon,maxIterationsPerRun,false);
054        }
055    
056        public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) {
057            this.name = name;
058            this.priority = priority;
059            this.daemon = daemon;
060            this.maxIterationsPerRun = maxIterationsPerRun;
061            this.dedicatedTaskRunner = dedicatedTaskRunner;
062        }
063    
064        public void init() {
065            if (initDone.compareAndSet(false, true)) {
066                // If your OS/JVM combination has a good thread model, you may want to
067                // avoid using a thread pool to run tasks and use a DedicatedTaskRunner instead.
068                if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) {
069                    executor = null;
070                } else if (executor == null) {
071                    executor = createDefaultExecutor();
072                }
073            }
074        }
075    
076        public void shutdown() {
077            if (executor != null) {
078                executor.shutdownNow();
079            }
080            initDone.set(false);
081        }
082    
083        public TaskRunner createTaskRunner(Task task, String name) {
084            init();
085            if (executor != null) {
086                return new PooledTaskRunner(executor, task, maxIterationsPerRun);
087            } else {
088                return new DedicatedTaskRunner(task, name, priority, daemon);
089            }
090        }
091    
092        public void execute(Runnable runnable) {
093            execute(runnable, "ActiveMQ Task");
094        }
095    
096        public void execute(Runnable runnable, String name) {
097            init();
098            if (executor != null) {
099                executor.execute(runnable);
100            } else {
101                new Thread(runnable, name + "-" + id.incrementAndGet()).start();
102            }
103        }
104    
105        protected ExecutorService createDefaultExecutor() {
106            ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
107                public Thread newThread(Runnable runnable) {
108                    Thread thread = new Thread(runnable, name + "-" + id.incrementAndGet());
109                    thread.setDaemon(daemon);
110                    thread.setPriority(priority);
111                    return thread;
112                }
113            });
114            return rc;
115        }
116    
117        public ExecutorService getExecutor() {
118            return executor;
119        }
120    
121        public void setExecutor(ExecutorService executor) {
122            this.executor = executor;
123        }
124    
125        public int getMaxIterationsPerRun() {
126            return maxIterationsPerRun;
127        }
128    
129        public void setMaxIterationsPerRun(int maxIterationsPerRun) {
130            this.maxIterationsPerRun = maxIterationsPerRun;
131        }
132    
133        public String getName() {
134            return name;
135        }
136    
137        public void setName(String name) {
138            this.name = name;
139        }
140    
141        public int getPriority() {
142            return priority;
143        }
144    
145        public void setPriority(int priority) {
146            this.priority = priority;
147        }
148    
149        public boolean isDaemon() {
150            return daemon;
151        }
152    
153        public void setDaemon(boolean daemon) {
154            this.daemon = daemon;
155        }
156    
157        public boolean isDedicatedTaskRunner() {
158            return dedicatedTaskRunner;
159        }
160    
161        public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
162            this.dedicatedTaskRunner = dedicatedTaskRunner;
163        }
164    }