Merge "Spec for Counter Framework bundle"
[genius.git] / mdsalutil / mdsalutil-api / src / main / java / org / opendaylight / genius / datastoreutils / DataStoreJobCoordinator.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.genius.datastoreutils;
10
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.Iterator;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.ForkJoinPool;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.locks.Condition;
24 import java.util.concurrent.locks.ReentrantLock;
25 import org.opendaylight.genius.infra.LoggingThreadUncaughtExceptionHandler;
26 import org.opendaylight.genius.infra.ThreadFactoryProvider;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 public class DataStoreJobCoordinator {
31
32     private static final Logger LOG = LoggerFactory.getLogger(DataStoreJobCoordinator.class);
33
34     private static final int THREADPOOL_SIZE = Runtime.getRuntime().availableProcessors();
35     private static final long RETRY_WAIT_BASE_TIME = 100;
36
37     // package local instead of private for TestDataStoreJobCoordinator
38     final ForkJoinPool fjPool;
39     final Map<Integer, Map<String, JobQueue>> jobQueueMap = new ConcurrentHashMap<>();
40
41     private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
42     private final ReentrantLock reentrantLock = new ReentrantLock();
43     private final Condition waitCondition = reentrantLock.newCondition();
44
45     private static DataStoreJobCoordinator instance;
46
47     static {
48         instance = new DataStoreJobCoordinator();
49     }
50
51     public static DataStoreJobCoordinator getInstance() {
52         return instance;
53     }
54
55     private DataStoreJobCoordinator() {
56         fjPool = new ForkJoinPool(
57                 Math.min(/* MAX_CAP */ 0x7fff, Runtime.getRuntime().availableProcessors()),
58                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
59                 LoggingThreadUncaughtExceptionHandler.toLOG(LOG),
60                 false);
61
62         for (int i = 0; i < THREADPOOL_SIZE; i++) {
63             Map<String, JobQueue> jobEntriesMap = new ConcurrentHashMap<>();
64             jobQueueMap.put(i, jobEntriesMap);
65         }
66
67         ThreadFactoryProvider.builder()
68             .namePrefix("DataStoreJobCoordinator-JobQueueHandler")
69             .logger(LOG)
70             .build().get()
71         .newThread(new JobQueueHandler()).start();
72     }
73
74     public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker) {
75         enqueueJob(key, mainWorker, null, 0);
76     }
77
78     public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
79             RollbackCallable rollbackWorker) {
80         enqueueJob(key, mainWorker, rollbackWorker, 0);
81     }
82
83     public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker, int maxRetries) {
84         enqueueJob(key, mainWorker, null, maxRetries);
85     }
86
87     public void enqueueJob(AbstractDataStoreJob job) throws InvalidJobException {
88         job.validate();
89         enqueueJob(job.getJobQueueKey(), job);
90     }
91
92     /**
93      *    This is used by the external applications to enqueue a Job
94      *    with an appropriate key. A JobEntry is created and queued
95      *    appropriately.
96      */
97     public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
98                            RollbackCallable rollbackWorker, int maxRetries) {
99         JobEntry jobEntry = new JobEntry(key, mainWorker, rollbackWorker, maxRetries);
100         Integer hashKey = getHashKey(key);
101         LOG.debug("Obtained Hashkey: {}, for jobkey: {}", hashKey, key);
102
103         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
104         synchronized (jobEntriesMap) {
105             JobQueue jobQueue = jobEntriesMap.get(key);
106             if (jobQueue == null) {
107                 jobQueue = new JobQueue();
108             }
109             LOG.trace("Adding jobkey {} to queue {} with size {}", key, hashKey, jobEntriesMap.size());
110             jobQueue.addEntry(jobEntry);
111             jobEntriesMap.put(key, jobQueue);
112
113             DataStoreJobCoordinatorCounters.jobs_pending.inc();
114             DataStoreJobCoordinatorCounters.jobs_incomplete.inc();
115             DataStoreJobCoordinatorCounters.jobs_created.inc();
116         }
117         reentrantLock.lock();
118         try {
119             waitCondition.signal();
120         } finally {
121             reentrantLock.unlock();
122         }
123     }
124
125     public long getIncompleteTaskCount() {
126         return DataStoreJobCoordinatorCounters.jobs_incomplete.get();
127     }
128
129     /**
130      * Cleanup the submitted job from the job queue.
131      **/
132     private void clearJob(JobEntry jobEntry) {
133         Integer hashKey = getHashKey(jobEntry.getKey());
134         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
135         LOG.trace("About to clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
136         synchronized (jobEntriesMap) {
137             JobQueue jobQueue = jobEntriesMap.get(jobEntry.getKey());
138             jobQueue.setExecutingEntry(null);
139             if (jobQueue.getWaitingEntries().isEmpty()) {
140                 LOG.trace("Clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
141                 jobEntriesMap.remove(jobEntry.getKey());
142             }
143         }
144         DataStoreJobCoordinatorCounters.jobs_cleared.inc();
145         DataStoreJobCoordinatorCounters.jobs_incomplete.dec();
146     }
147
148     /**
149      * Used to generate the hashkey in to the jobQueueMap.
150      */
151     private Integer getHashKey(String key) {
152         int code = key.hashCode();
153         return (code % THREADPOOL_SIZE + THREADPOOL_SIZE) % THREADPOOL_SIZE;
154     }
155
156     /**
157      * JobCallback class is used as a future callback for main and rollback
158      * workers to handle success and failure.
159      */
160     private class JobCallback implements FutureCallback<List<Void>> {
161         private final JobEntry jobEntry;
162
163         JobCallback(JobEntry jobEntry) {
164             this.jobEntry = jobEntry;
165         }
166
167         /**
168          * This implies that all the future instances have returned
169          * success. -- TODO: Confirm this
170          */
171         @Override
172         public void onSuccess(List<Void> voids) {
173             LOG.trace("Job {} completed successfully", jobEntry.getKey());
174             clearJob(jobEntry);
175         }
176
177         /**
178          * This method is used to handle failure callbacks. If more
179          * retry needed, the retrycount is decremented and mainworker
180          * is executed again. After retries completed, rollbackworker
181          * is executed. If rollbackworker fails, this is a
182          * double-fault. Double fault is logged and ignored.
183          */
184         @Override
185         public void onFailure(Throwable throwable) {
186             LOG.warn("Job: {} failed", jobEntry, throwable);
187             if (jobEntry.getMainWorker() == null) {
188                 LOG.error("Job: {} failed with Double-Fault. Bailing Out.", jobEntry);
189                 clearJob(jobEntry);
190                 return;
191             }
192
193             int retryCount = jobEntry.decrementRetryCountAndGet();
194             if (retryCount > 0) {
195                 long waitTime = RETRY_WAIT_BASE_TIME * 10 / retryCount;
196                 scheduledExecutorService.schedule(() -> {
197                     MainTask worker = new MainTask(jobEntry);
198                     fjPool.execute(worker);
199                 }, waitTime, TimeUnit.MILLISECONDS);
200                 return;
201             }
202
203             if (jobEntry.getRollbackWorker() != null) {
204                 jobEntry.setMainWorker(null);
205                 RollbackTask rollbackTask = new RollbackTask(jobEntry);
206                 fjPool.execute(rollbackTask);
207                 return;
208             }
209
210             clearJob(jobEntry);
211         }
212     }
213
214     /**
215      * RollbackTask is used to execute the RollbackCallable provided by the
216      * application in the eventuality of a failure.
217      */
218     private class RollbackTask implements Runnable {
219         private final JobEntry jobEntry;
220
221         RollbackTask(JobEntry jobEntry) {
222             this.jobEntry = jobEntry;
223         }
224
225         @Override
226         @SuppressWarnings("checkstyle:IllegalCatch")
227         public void run() {
228             RollbackCallable callable = jobEntry.getRollbackWorker();
229             callable.setFutures(jobEntry.getFutures());
230             List<ListenableFuture<Void>> futures = null;
231
232             try {
233                 futures = callable.call();
234             } catch (Exception e) {
235                 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
236             }
237
238             if (futures == null || futures.isEmpty()) {
239                 clearJob(jobEntry);
240                 return;
241             }
242
243             ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
244             Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
245             jobEntry.setFutures(futures);
246         }
247     }
248
249     /**
250      * Execute the MainWorker callable.
251      */
252     private class MainTask implements Runnable {
253         private static final int LONG_JOBS_THRESHOLD = 1000; // MS
254         private final JobEntry jobEntry;
255
256         MainTask(JobEntry jobEntry) {
257             this.jobEntry = jobEntry;
258         }
259
260         @Override
261         @SuppressWarnings("checkstyle:illegalcatch")
262         public void run() {
263             List<ListenableFuture<Void>> futures = null;
264             long jobStartTimestamp = System.currentTimeMillis();
265             LOG.trace("Running job {}", jobEntry.getKey());
266
267             try {
268                 futures = jobEntry.getMainWorker().call();
269                 long jobExecutionTime = System.currentTimeMillis() - jobStartTimestamp;
270                 printJobs(jobEntry.getKey(), jobExecutionTime);
271             } catch (Exception e) {
272                 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
273             }
274
275             if (futures == null || futures.isEmpty()) {
276                 clearJob(jobEntry);
277                 return;
278             }
279
280             ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
281             Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
282             jobEntry.setFutures(futures);
283         }
284
285         private void printJobs(String key, long jobExecutionTime) {
286             if (jobExecutionTime > LONG_JOBS_THRESHOLD) {
287                 LOG.warn("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
288                 return;
289             }
290             LOG.trace("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
291         }
292     }
293
294     private class JobQueueHandler implements Runnable {
295         @Override
296         @SuppressWarnings("checkstyle:illegalcatch")
297         public void run() {
298             LOG.info("Starting JobQueue Handler Thread with pool size {}", THREADPOOL_SIZE);
299             while (true) {
300                 try {
301                     for (int i = 0; i < THREADPOOL_SIZE; i++) {
302                         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
303                         if (jobEntriesMap.isEmpty()) {
304                             continue;
305                         }
306                         LOG.trace("JobQueueHandler handling queue {} with kesy size {}. Keys: {} ", i,
307                                 jobEntriesMap.size(), jobEntriesMap.keySet());
308
309                         synchronized (jobEntriesMap) {
310                             Iterator<Map.Entry<String, JobQueue>> it = jobEntriesMap.entrySet().iterator();
311                             while (it.hasNext()) {
312                                 Map.Entry<String, JobQueue> entry = it.next();
313                                 if (entry.getValue().getExecutingEntry() != null) {
314                                     continue;
315                                 }
316                                 JobEntry jobEntry = entry.getValue().getWaitingEntries().poll();
317                                 if (jobEntry != null) {
318                                     entry.getValue().setExecutingEntry(jobEntry);
319                                     MainTask worker = new MainTask(jobEntry);
320                                     LOG.trace("Executing job {} from queue {}", jobEntry.getKey(), i);
321                                     fjPool.execute(worker);
322                                     DataStoreJobCoordinatorCounters.jobs_pending.dec();
323
324                                 } else {
325                                     it.remove();
326                                 }
327                             }
328                         }
329                     }
330
331                     reentrantLock.lock();
332                     try {
333                         if (isJobQueueEmpty()) {
334                             waitCondition.await();
335                         }
336                     } finally {
337                         reentrantLock.unlock();
338                     }
339                 } catch (Exception e) {
340                     LOG.error("Exception while executing the tasks", e);
341                 } catch (Throwable e) {
342                     LOG.error("Error while executing the tasks", e);
343                 }
344             }
345         }
346     }
347
348     private boolean isJobQueueEmpty() {
349         for (int i = 0; i < THREADPOOL_SIZE; i++) {
350             Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
351             if (!jobEntriesMap.isEmpty()) {
352                 return false;
353             }
354         }
355
356         return true;
357     }
358
359 }