1 package org.pcmm.concurrent.impl;
3 import java.lang.ref.WeakReference;
4 import java.util.HashMap;
5 import java.util.Iterator;
7 import java.util.concurrent.Callable;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.Executors;
11 import org.pcmm.concurrent.IWorker;
12 import org.pcmm.concurrent.IWorkerPool;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
17 * Pool to manage PCMM workers
19 public class WorkerPool implements IWorkerPool {
24 private Map<Integer, WeakReference<IWorker>> workersMap;
26 private Logger logger = LoggerFactory.getLogger(IWorkerPool.class);
27 private ExecutorService executor;
30 this(DEFAULT_MAX_WORKERS);
33 public WorkerPool(int size) {
34 logger.info("Pool size :" + size);
35 workersMap = new HashMap<Integer, WeakReference<IWorker>>();
36 executor = Executors.newFixedThreadPool(size);
42 * @see org.pcmm.threading.IWorkerPool#schedule(org.pcmm.threading.IWorker,
46 public int schedule(IWorker worker, int t) {
49 logger.debug("woker[" + worker + "] added, starts in " + t + " ms");
50 WeakReference<IWorker> workerRef = new WeakReference<IWorker>(worker);
51 int ref = workerRef.hashCode();
52 workersMap.put(ref, workerRef);
54 executor.execute(worker);
62 * org.pcmm.concurrent.IWorkerPool#schedule(org.pcmm.concurrent.IWorker)
65 public int schedule(IWorker worker) {
66 return schedule(worker, 0);
72 * @see org.pcmm.concurrent.IWorkerPool#sendKillSignal(int)
75 public void sendKillSignal(int pid) {
76 if (workersMap.size() > 0) {
77 WeakReference<IWorker> weakRef = workersMap.get(pid);
78 if (weakRef != null) {
79 IWorker ref = weakRef.get();
82 if (!weakRef.isEnqueued()) {
94 * @see org.pcmm.threading.IWorkerPool#killAll()
97 public void killAll() {
98 for (WeakReference<IWorker> weakRef : workersMap.values()) {
99 IWorker ref = weakRef.get();
102 if (!weakRef.isEnqueued()) {
113 * @see org.pcmm.threading.IWorkerPool#recycle()
116 public void recycle() {
117 for (Iterator<Integer> pids = workersMap.keySet().iterator(); pids.hasNext();) {
118 WeakReference<IWorker> weakRef = workersMap.get(pids.next());
119 IWorker ref = weakRef.get();
121 if (!weakRef.isEnqueued()) {
125 workersMap.remove(weakRef);
132 public Object adapt(Object object, Class<?> clazz) {
133 if (clazz.isAssignableFrom(object.getClass()))
139 public IWorker adapt(Object object) {
140 IWorker worker = (IWorker) adapt(object, IWorker.class);
141 if (worker == null) {
142 if (object instanceof Callable)
143 worker = new Worker((Callable<?>) object);
144 else if (object instanceof Runnable) {
145 final Runnable runner = (Runnable) object;
146 worker = new Worker(new Callable<Object>() {
148 public Object call() throws Exception {
149 ((Runnable) runner).run();