TestDataStoreJobCoordinator with waitForAllJobs() methods req. in tests
[genius.git] / mdsalutil / mdsalutil-api / src / main / java / org / opendaylight / genius / datastoreutils / DataStoreJobCoordinator.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.genius.datastoreutils;
10
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.Iterator;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.ForkJoinPool;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.locks.Condition;
24 import java.util.concurrent.locks.ReentrantLock;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 public class DataStoreJobCoordinator {
29
30     private static final Logger LOG = LoggerFactory.getLogger(DataStoreJobCoordinator.class);
31
32     private static final int THREADPOOL_SIZE = Runtime.getRuntime().availableProcessors();
33     private static final long RETRY_WAIT_BASE_TIME = 100;
34
35     // package local instead of private for TestDataStoreJobCoordinator
36     final ForkJoinPool fjPool;
37     final Map<Integer, Map<String, JobQueue>> jobQueueMap = new ConcurrentHashMap<>();
38
39     private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
40     private final ReentrantLock reentrantLock = new ReentrantLock();
41     private final Condition waitCondition = reentrantLock.newCondition();
42
43     private static DataStoreJobCoordinator instance;
44
45     static {
46         instance = new DataStoreJobCoordinator();
47     }
48
49     public static DataStoreJobCoordinator getInstance() {
50         return instance;
51     }
52
53     private DataStoreJobCoordinator() {
54         fjPool = new ForkJoinPool();
55
56         for (int i = 0; i < THREADPOOL_SIZE; i++) {
57             Map<String, JobQueue> jobEntriesMap = new ConcurrentHashMap<>();
58             jobQueueMap.put(i, jobEntriesMap);
59         }
60
61         new Thread(new JobQueueHandler()).start();
62     }
63
64     public void enqueueJob(String key,
65             Callable<List<ListenableFuture<Void>>> mainWorker) {
66         enqueueJob(key, mainWorker, null, 0);
67     }
68
69     public void enqueueJob(String key,
70             Callable<List<ListenableFuture<Void>>> mainWorker,
71             RollbackCallable rollbackWorker) {
72         enqueueJob(key, mainWorker, rollbackWorker, 0);
73     }
74
75     public void enqueueJob(String key,
76             Callable<List<ListenableFuture<Void>>> mainWorker,
77             int maxRetries) {
78         enqueueJob(key, mainWorker, null, maxRetries);
79     }
80
81     public void enqueueJob(AbstractDataStoreJob job) throws InvalidJobException {
82         job.validate();
83         enqueueJob(job.getJobQueueKey(), job);
84     }
85
86     /**
87      * Enqueue a Job with an appropriate key.
88      * A JobEntry is created and queued appropriately.
89      */
90     public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
91                            RollbackCallable rollbackWorker, int maxRetries) {
92         JobEntry jobEntry = new JobEntry(key, mainWorker, rollbackWorker, maxRetries);
93         Integer hashKey = getHashKey(key);
94         LOG.debug("Obtained Hashkey: {}, for jobkey: {}", hashKey, key);
95
96         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
97         synchronized (jobEntriesMap) {
98             JobQueue jobQueue = jobEntriesMap.get(key);
99             if (jobQueue == null) {
100                 jobQueue = new JobQueue();
101             }
102             LOG.trace("Adding jobkey {} to queue {} with size {}", key, hashKey, jobEntriesMap.size());
103             jobQueue.addEntry(jobEntry);
104             jobEntriesMap.put(key, jobQueue);
105         }
106         reentrantLock.lock();
107         try {
108             waitCondition.signal();
109         } finally {
110             reentrantLock.unlock();
111         }
112     }
113
114     /**
115      * Cleanup the submitted job from the job queue.
116      **/
117     private void clearJob(JobEntry jobEntry) {
118         Integer hashKey = getHashKey(jobEntry.getKey());
119         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
120         LOG.trace("About to clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
121         synchronized (jobEntriesMap) {
122             JobQueue jobQueue = jobEntriesMap.get(jobEntry.getKey());
123             jobQueue.setExecutingEntry(null);
124             if (jobQueue.getWaitingEntries().isEmpty()) {
125                 LOG.trace("Clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
126                 jobEntriesMap.remove(jobEntry.getKey());
127             }
128         }
129     }
130
131     /**
132      * Generate the hashkey for the jobQueueMap.
133      */
134     private Integer getHashKey(String key) {
135         int code = key.hashCode();
136         return (code % THREADPOOL_SIZE + THREADPOOL_SIZE) % THREADPOOL_SIZE;
137     }
138
139     /**
140      * JobCallback class is used as a future callback for
141      * main and rollback workers to handle success and failure.
142      */
143     private class JobCallback implements FutureCallback<List<Void>> {
144         private final JobEntry jobEntry;
145
146         JobCallback(JobEntry jobEntry) {
147             this.jobEntry = jobEntry;
148         }
149
150         /**
151          * This implies that all the future instances have returned success. -- TODO: Confirm this
152          */
153         @Override
154         public void onSuccess(List<Void> voids) {
155             LOG.trace("Job {} completed successfully", jobEntry.getKey());
156             clearJob(jobEntry);
157         }
158
159         /**
160          * Handle failure callbacks.
161          * If more retry needed, the retrycount is decremented and mainworker is executed again.
162          * After retries completed, rollbackworker is executed.
163          * If rollbackworker fails, this is a double-fault. Double fault is logged and ignored.
164          */
165         @Override
166         public void onFailure(Throwable throwable) {
167             LOG.warn("Job: {} failed with exception: {} {}", jobEntry, throwable.getClass().getSimpleName(),
168                     throwable.getStackTrace());
169             if (jobEntry.getMainWorker() == null) {
170                 LOG.error("Job: {} failed with Double-Fault. Bailing Out.", jobEntry);
171                 clearJob(jobEntry);
172                 return;
173             }
174
175             int retryCount = jobEntry.decrementRetryCountAndGet();
176             if ( retryCount > 0) {
177                 long waitTime = RETRY_WAIT_BASE_TIME * 10 / retryCount;
178                 scheduledExecutorService.schedule(
179                     () -> {
180                         MainTask worker = new MainTask(jobEntry);
181                         fjPool.execute(worker);
182                     },
183                     waitTime,
184                     TimeUnit.MILLISECONDS);
185                 return;
186             }
187
188             if (jobEntry.getRollbackWorker() != null) {
189                 jobEntry.setMainWorker(null);
190                 RollbackTask rollbackTask = new RollbackTask(jobEntry);
191                 fjPool.execute(rollbackTask);
192                 return;
193             }
194
195             clearJob(jobEntry);
196         }
197     }
198
199     /**
200      * Execute the RollbackCallable provided by the application in the eventuality of a failure.
201      */
202     private class RollbackTask implements Runnable {
203         private final JobEntry jobEntry;
204
205         RollbackTask(JobEntry jobEntry) {
206             this.jobEntry = jobEntry;
207         }
208
209         @Override
210         @SuppressWarnings("checkstyle:illegalcatch")
211         public void run() {
212             RollbackCallable callable = jobEntry.getRollbackWorker();
213             callable.setFutures(jobEntry.getFutures());
214             List<ListenableFuture<Void>> futures = null;
215
216             try {
217                 futures = callable.call();
218             } catch (Exception e) {
219                 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
220             }
221
222             if (futures == null || futures.isEmpty()) {
223                 clearJob(jobEntry);
224                 return;
225             }
226
227             ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
228             Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
229             jobEntry.setFutures(futures);
230         }
231     }
232
233     /**
234      * Execute the MainWorker callable.
235      */
236     private class MainTask implements Runnable {
237         private final JobEntry jobEntry;
238
239         MainTask(JobEntry jobEntry) {
240             this.jobEntry = jobEntry;
241         }
242
243         @Override
244         @SuppressWarnings("checkstyle:illegalcatch")
245         public void run() {
246             List<ListenableFuture<Void>> futures = null;
247             long jobStartTimestamp = System.currentTimeMillis();
248             LOG.trace("Running job {}", jobEntry.getKey());
249
250             try {
251                 futures = jobEntry.getMainWorker().call();
252                 long jobExecutionTime = System.currentTimeMillis() - jobStartTimestamp;
253                 LOG.trace("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
254             } catch (Exception e) {
255                 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
256             }
257
258             if (futures == null || futures.isEmpty()) {
259                 clearJob(jobEntry);
260                 return;
261             }
262
263             ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
264             Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
265             jobEntry.setFutures(futures);
266         }
267     }
268
269     private class JobQueueHandler implements Runnable {
270         @Override
271         @SuppressWarnings("checkstyle:illegalcatch")
272         public void run() {
273             LOG.info("Starting JobQueue Handler Thread with pool size {}", THREADPOOL_SIZE);
274             while (true) {
275                 try {
276                     for (int i = 0; i < THREADPOOL_SIZE; i++) {
277                         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
278                         if (jobEntriesMap.isEmpty()) {
279                             continue;
280                         }
281
282                         LOG.trace("JobQueueHandler handling queue {} with size {}", i, jobEntriesMap.size());
283                         synchronized (jobEntriesMap) {
284                             Iterator<Map.Entry<String, JobQueue>> it = jobEntriesMap.entrySet().iterator();
285                             while (it.hasNext()) {
286                                 Map.Entry<String, JobQueue> entry = it.next();
287                                 if (entry.getValue().getExecutingEntry() != null) {
288                                     continue;
289                                 }
290                                 JobEntry jobEntry = entry.getValue().getWaitingEntries().poll();
291                                 if (jobEntry != null) {
292                                     entry.getValue().setExecutingEntry(jobEntry);
293                                     MainTask worker = new MainTask(jobEntry);
294                                     LOG.trace("Executing job {} from queue {}", jobEntry.getKey(), i);
295                                     fjPool.execute(worker);
296                                 } else {
297                                     it.remove();
298                                 }
299                             }
300                         }
301                     }
302
303                     reentrantLock.lock();
304                     try {
305                         if (isJobQueueEmpty()) {
306                             waitCondition.await();
307                         }
308                     } finally {
309                         reentrantLock.unlock();
310                     }
311                 } catch (Exception e) {
312                     LOG.error("Exception while executing the tasks {} ", e);
313                 } catch (Throwable e) {
314                     LOG.error("Error while executing the tasks {} ", e);
315                 }
316             }
317         }
318     }
319
320     private boolean isJobQueueEmpty() {
321         for (int i = 0; i < THREADPOOL_SIZE; i++) {
322             Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
323             if (!jobEntriesMap.isEmpty()) {
324                 return false;
325             }
326         }
327
328         return true;
329     }
330
331 }