Add missing license headers to packetcable-driver base
[packetcable.git] / packetcable-driver / src / main / java / org / pcmm / concurrent / impl / WorkerPool.java
1 package org.pcmm.concurrent.impl;
2
3 import java.lang.ref.WeakReference;
4 import java.util.HashMap;
5 import java.util.Iterator;
6 import java.util.Map;
7 import java.util.concurrent.Callable;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.Executors;
10
11 import org.pcmm.concurrent.IWorker;
12 import org.pcmm.concurrent.IWorkerPool;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15
16 /**
17  * Pool to manage PCMM workers
18  */
19 public class WorkerPool implements IWorkerPool {
20
21         /**
22          * 
23          */
24         private Map<Integer, WeakReference<IWorker>> workersMap;
25
26         private Logger logger = LoggerFactory.getLogger(IWorkerPool.class);
27         private ExecutorService executor;
28
29         public WorkerPool() {
30                 this(DEFAULT_MAX_WORKERS);
31         }
32
33         public WorkerPool(int size) {
34                 logger.info("Pool size :" + size);
35                 workersMap = new HashMap<Integer, WeakReference<IWorker>>();
36                 executor = Executors.newFixedThreadPool(size);
37         }
38
39         /*
40          * (non-Javadoc)
41          * 
42          * @see org.pcmm.threading.IWorkerPool#schedule(org.pcmm.threading.IWorker,
43          * int)
44          */
45         @Override
46         public int schedule(IWorker worker, int t) {
47                 if (worker == null)
48                         return -1;
49                 logger.debug("woker[" + worker + "] added, starts in " + t + " ms");
50                 WeakReference<IWorker> workerRef = new WeakReference<IWorker>(worker);
51                 int ref = workerRef.hashCode();
52                 workersMap.put(ref, workerRef);
53                 worker.shouldWait(t);
54                 executor.execute(worker);
55                 return ref;
56         }
57
58         /*
59          * (non-Javadoc)
60          * 
61          * @see
62          * org.pcmm.concurrent.IWorkerPool#schedule(org.pcmm.concurrent.IWorker)
63          */
64         @Override
65         public int schedule(IWorker worker) {
66                 return schedule(worker, 0);
67         }
68
69         /*
70          * (non-Javadoc)
71          * 
72          * @see org.pcmm.concurrent.IWorkerPool#sendKillSignal(int)
73          */
74         @Override
75         public void sendKillSignal(int pid) {
76                 if (workersMap.size() > 0) {
77                         WeakReference<IWorker> weakRef = workersMap.get(pid);
78                         if (weakRef != null) {
79                                 IWorker ref = weakRef.get();
80                                 if (ref != null)
81                                         ref.done();
82                                 if (!weakRef.isEnqueued()) {
83                                         weakRef.clear();
84                                         weakRef.enqueue();
85                                 }
86                         }
87                 }
88
89         }
90
91         /*
92          * (non-Javadoc)
93          * 
94          * @see org.pcmm.threading.IWorkerPool#killAll()
95          */
96         @Override
97         public void killAll() {
98                 for (WeakReference<IWorker> weakRef : workersMap.values()) {
99                         IWorker ref = weakRef.get();
100                         if (ref != null)
101                                 ref.done();
102                         if (!weakRef.isEnqueued()) {
103                                 weakRef.clear();
104                                 weakRef.enqueue();
105                         }
106                 }
107                 recycle();
108         }
109
110         /*
111          * (non-Javadoc)
112          * 
113          * @see org.pcmm.threading.IWorkerPool#recycle()
114          */
115         @Override
116         public void recycle() {
117                 for (Iterator<Integer> pids = workersMap.keySet().iterator(); pids.hasNext();) {
118                         WeakReference<IWorker> weakRef = workersMap.get(pids.next());
119                         IWorker ref = weakRef.get();
120                         if (ref == null) {
121                                 if (!weakRef.isEnqueued()) {
122                                         weakRef.clear();
123                                         weakRef.enqueue();
124                                 }
125                                 workersMap.remove(weakRef);
126                         }
127                 }
128
129         }
130
131         @Override
132         public Object adapt(Object object, Class<?> clazz) {
133                 if (clazz.isAssignableFrom(object.getClass()))
134                         return object;
135                 return null;
136         }
137
138         @Override
139         public IWorker adapt(Object object) {
140                 IWorker worker = (IWorker) adapt(object, IWorker.class);
141                 if (worker == null) {
142                         if (object instanceof Callable)
143                                 worker = new Worker((Callable<?>) object);
144                         else if (object instanceof Runnable) {
145                                 final Runnable runner = (Runnable) object;
146                                 worker = new Worker(new Callable<Object>() {
147                                         @Override
148                                         public Object call() throws Exception {
149                                                 ((Runnable) runner).run();
150                                                 return null;
151                                         }
152                                 });
153                         }
154                 }
155                 return worker;
156         }
157
158 }