Checkstyle compliance for DataStoreJobCoordinator, incl. clean JavaDoc
[genius.git] / mdsalutil / mdsalutil-api / src / main / java / org / opendaylight / genius / datastoreutils / DataStoreJobCoordinator.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.genius.datastoreutils;
10
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.Iterator;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.ForkJoinPool;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.locks.Condition;
24 import java.util.concurrent.locks.ReentrantLock;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 public class DataStoreJobCoordinator {
29
30     private static final Logger LOG = LoggerFactory.getLogger(DataStoreJobCoordinator.class);
31
32     private static final int THREADPOOL_SIZE = Runtime.getRuntime().availableProcessors();
33     private static final long RETRY_WAIT_BASE_TIME = 100;
34
35     private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
36     private final ForkJoinPool fjPool;
37     private final Map<Integer, Map<String, JobQueue>> jobQueueMap = new ConcurrentHashMap<>();
38     private final ReentrantLock reentrantLock = new ReentrantLock();
39     private final Condition waitCondition = reentrantLock.newCondition();
40
41     private static DataStoreJobCoordinator instance;
42
43     static {
44         instance = new DataStoreJobCoordinator();
45     }
46
47     public static DataStoreJobCoordinator getInstance() {
48         return instance;
49     }
50
51     private DataStoreJobCoordinator() {
52         fjPool = new ForkJoinPool();
53
54         for (int i = 0; i < THREADPOOL_SIZE; i++) {
55             Map<String, JobQueue> jobEntriesMap = new ConcurrentHashMap<>();
56             jobQueueMap.put(i, jobEntriesMap);
57         }
58
59         new Thread(new JobQueueHandler()).start();
60     }
61
62     public void enqueueJob(String key,
63             Callable<List<ListenableFuture<Void>>> mainWorker) {
64         enqueueJob(key, mainWorker, null, 0);
65     }
66
67     public void enqueueJob(String key,
68             Callable<List<ListenableFuture<Void>>> mainWorker,
69             RollbackCallable rollbackWorker) {
70         enqueueJob(key, mainWorker, rollbackWorker, 0);
71     }
72
73     public void enqueueJob(String key,
74             Callable<List<ListenableFuture<Void>>> mainWorker,
75             int maxRetries) {
76         enqueueJob(key, mainWorker, null, maxRetries);
77     }
78
79     public void enqueueJob(AbstractDataStoreJob job) throws InvalidJobException {
80         job.validate();
81         enqueueJob(job.getJobQueueKey(), job);
82     }
83
84     /**
85      * Enqueue a Job with an appropriate key.
86      * A JobEntry is created and queued appropriately.
87      */
88     public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
89                            RollbackCallable rollbackWorker, int maxRetries) {
90         JobEntry jobEntry = new JobEntry(key, mainWorker, rollbackWorker, maxRetries);
91         Integer hashKey = getHashKey(key);
92         LOG.debug("Obtained Hashkey: {}, for jobkey: {}", hashKey, key);
93
94         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
95         synchronized (jobEntriesMap) {
96             JobQueue jobQueue = jobEntriesMap.get(key);
97             if (jobQueue == null) {
98                 jobQueue = new JobQueue();
99             }
100             LOG.trace("Adding jobkey {} to queue {} with size {}", key, hashKey, jobEntriesMap.size());
101             jobQueue.addEntry(jobEntry);
102             jobEntriesMap.put(key, jobQueue);
103         }
104         reentrantLock.lock();
105         try {
106             waitCondition.signal();
107         } finally {
108             reentrantLock.unlock();
109         }
110     }
111
112     /**
113      * Cleanup the submitted job from the job queue.
114      **/
115     private void clearJob(JobEntry jobEntry) {
116         Integer hashKey = getHashKey(jobEntry.getKey());
117         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
118         LOG.trace("About to clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
119         synchronized (jobEntriesMap) {
120             JobQueue jobQueue = jobEntriesMap.get(jobEntry.getKey());
121             jobQueue.setExecutingEntry(null);
122             if (jobQueue.getWaitingEntries().isEmpty()) {
123                 LOG.trace("Clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
124                 jobEntriesMap.remove(jobEntry.getKey());
125             }
126         }
127     }
128
129     /**
130      * Generate the hashkey for the jobQueueMap.
131      */
132     private Integer getHashKey(String key) {
133         int code = key.hashCode();
134         return (code % THREADPOOL_SIZE + THREADPOOL_SIZE) % THREADPOOL_SIZE;
135     }
136
137     /**
138      * JobCallback class is used as a future callback for
139      * main and rollback workers to handle success and failure.
140      */
141     private class JobCallback implements FutureCallback<List<Void>> {
142         private final JobEntry jobEntry;
143
144         JobCallback(JobEntry jobEntry) {
145             this.jobEntry = jobEntry;
146         }
147
148         /**
149          * This implies that all the future instances have returned success. -- TODO: Confirm this
150          */
151         @Override
152         public void onSuccess(List<Void> voids) {
153             LOG.trace("Job {} completed successfully", jobEntry.getKey());
154             clearJob(jobEntry);
155         }
156
157         /**
158          * Handle failure callbacks.
159          * If more retry needed, the retrycount is decremented and mainworker is executed again.
160          * After retries completed, rollbackworker is executed.
161          * If rollbackworker fails, this is a double-fault. Double fault is logged and ignored.
162          */
163         @Override
164         public void onFailure(Throwable throwable) {
165             LOG.warn("Job: {} failed with exception: {} {}", jobEntry, throwable.getClass().getSimpleName(),
166                     throwable.getStackTrace());
167             if (jobEntry.getMainWorker() == null) {
168                 LOG.error("Job: {} failed with Double-Fault. Bailing Out.", jobEntry);
169                 clearJob(jobEntry);
170                 return;
171             }
172
173             int retryCount = jobEntry.decrementRetryCountAndGet();
174             if ( retryCount > 0) {
175                 long waitTime = RETRY_WAIT_BASE_TIME * 10 / retryCount;
176                 scheduledExecutorService.schedule(
177                     () -> {
178                         MainTask worker = new MainTask(jobEntry);
179                         fjPool.execute(worker);
180                     },
181                     waitTime,
182                     TimeUnit.MILLISECONDS);
183                 return;
184             }
185
186             if (jobEntry.getRollbackWorker() != null) {
187                 jobEntry.setMainWorker(null);
188                 RollbackTask rollbackTask = new RollbackTask(jobEntry);
189                 fjPool.execute(rollbackTask);
190                 return;
191             }
192
193             clearJob(jobEntry);
194         }
195     }
196
197     /**
198      * Execute the RollbackCallable provided by the application in the eventuality of a failure.
199      */
200     private class RollbackTask implements Runnable {
201         private final JobEntry jobEntry;
202
203         RollbackTask(JobEntry jobEntry) {
204             this.jobEntry = jobEntry;
205         }
206
207         @Override
208         @SuppressWarnings("checkstyle:illegalcatch")
209         public void run() {
210             RollbackCallable callable = jobEntry.getRollbackWorker();
211             callable.setFutures(jobEntry.getFutures());
212             List<ListenableFuture<Void>> futures = null;
213
214             try {
215                 futures = callable.call();
216             } catch (Exception e) {
217                 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
218             }
219
220             if (futures == null || futures.isEmpty()) {
221                 clearJob(jobEntry);
222                 return;
223             }
224
225             ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
226             Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
227             jobEntry.setFutures(futures);
228         }
229     }
230
231     /**
232      * Execute the MainWorker callable.
233      */
234     private class MainTask implements Runnable {
235         private final JobEntry jobEntry;
236
237         MainTask(JobEntry jobEntry) {
238             this.jobEntry = jobEntry;
239         }
240
241         @Override
242         @SuppressWarnings("checkstyle:illegalcatch")
243         public void run() {
244             List<ListenableFuture<Void>> futures = null;
245             long jobStartTimestamp = System.currentTimeMillis();
246             LOG.trace("Running job {}", jobEntry.getKey());
247
248             try {
249                 futures = jobEntry.getMainWorker().call();
250                 long jobExecutionTime = System.currentTimeMillis() - jobStartTimestamp;
251                 LOG.trace("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
252             } catch (Exception e) {
253                 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
254             }
255
256             if (futures == null || futures.isEmpty()) {
257                 clearJob(jobEntry);
258                 return;
259             }
260
261             ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
262             Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
263             jobEntry.setFutures(futures);
264         }
265     }
266
267     private class JobQueueHandler implements Runnable {
268         @Override
269         @SuppressWarnings("checkstyle:illegalcatch")
270         public void run() {
271             LOG.info("Starting JobQueue Handler Thread with pool size {}", THREADPOOL_SIZE);
272             while (true) {
273                 try {
274                     for (int i = 0; i < THREADPOOL_SIZE; i++) {
275                         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
276                         if (jobEntriesMap.isEmpty()) {
277                             continue;
278                         }
279
280                         LOG.trace("JobQueueHandler handling queue {} with size {}", i, jobEntriesMap.size());
281                         synchronized (jobEntriesMap) {
282                             Iterator<Map.Entry<String, JobQueue>> it = jobEntriesMap.entrySet().iterator();
283                             while (it.hasNext()) {
284                                 Map.Entry<String, JobQueue> entry = it.next();
285                                 if (entry.getValue().getExecutingEntry() != null) {
286                                     continue;
287                                 }
288                                 JobEntry jobEntry = entry.getValue().getWaitingEntries().poll();
289                                 if (jobEntry != null) {
290                                     entry.getValue().setExecutingEntry(jobEntry);
291                                     MainTask worker = new MainTask(jobEntry);
292                                     LOG.trace("Executing job {} from queue {}", jobEntry.getKey(), i);
293                                     fjPool.execute(worker);
294                                 } else {
295                                     it.remove();
296                                 }
297                             }
298                         }
299                     }
300
301                     reentrantLock.lock();
302                     try {
303                         if (isJobQueueEmpty()) {
304                             waitCondition.await();
305                         }
306                     } finally {
307                         reentrantLock.unlock();
308                     }
309                 } catch (Exception e) {
310                     LOG.error("Exception while executing the tasks {} ", e);
311                 } catch (Throwable e) {
312                     LOG.error("Error while executing the tasks {} ", e);
313                 }
314             }
315         }
316     }
317
318     private boolean isJobQueueEmpty() {
319         for (int i = 0; i < THREADPOOL_SIZE; i++) {
320             Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
321             if (!jobEntriesMap.isEmpty()) {
322                 return false;
323             }
324         }
325
326         return true;
327     }
328 }