Merge "Fix the JMXAlarmAgent"
[genius.git] / mdsalutil / mdsalutil-api / src / main / java / org / opendaylight / genius / datastoreutils / DataStoreJobCoordinator.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.genius.datastoreutils;
10
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.Iterator;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.Arrays;
18 import java.util.Collections;
19 import java.util.concurrent.Callable;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ForkJoinPool;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.locks.Condition;
26 import java.util.concurrent.locks.ReentrantLock;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 public class DataStoreJobCoordinator {
31
32     private static final Logger LOG = LoggerFactory.getLogger(DataStoreJobCoordinator.class);
33
34     private static final int THREADPOOL_SIZE = Runtime.getRuntime().availableProcessors();
35     private static final long RETRY_WAIT_BASE_TIME = 100;
36
37     // package local instead of private for TestDataStoreJobCoordinator
38     final ForkJoinPool fjPool;
39     final Map<Integer, Map<String, JobQueue>> jobQueueMap = new ConcurrentHashMap<>();
40
41     private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
42     private final ReentrantLock reentrantLock = new ReentrantLock();
43     private final Condition waitCondition = reentrantLock.newCondition();
44
45     private static DataStoreJobCoordinator instance;
46
47     static {
48         instance = new DataStoreJobCoordinator();
49     }
50
51     public static DataStoreJobCoordinator getInstance() {
52         return instance;
53     }
54
55     private DataStoreJobCoordinator() {
56         fjPool = new ForkJoinPool();
57
58         for (int i = 0; i < THREADPOOL_SIZE; i++) {
59             Map<String, JobQueue> jobEntriesMap = new ConcurrentHashMap<>();
60             jobQueueMap.put(i, jobEntriesMap);
61         }
62
63         new Thread(new JobQueueHandler()).start();
64     }
65
66     public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker) {
67         enqueueJob(key, mainWorker, null, 0);
68     }
69
70     public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
71             RollbackCallable rollbackWorker) {
72         enqueueJob(key, mainWorker, rollbackWorker, 0);
73     }
74
75     public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker, int maxRetries) {
76         enqueueJob(key, mainWorker, null, maxRetries);
77     }
78
79     public void enqueueJob(AbstractDataStoreJob job) throws InvalidJobException {
80         job.validate();
81         enqueueJob(job.getJobQueueKey(), job);
82     }
83
84     /**
85      *    This is used by the external applications to enqueue a Job
86      *    with an appropriate key. A JobEntry is created and queued
87      *    appropriately.
88      */
89     public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
90                            RollbackCallable rollbackWorker, int maxRetries) {
91         JobEntry jobEntry = new JobEntry(key, mainWorker, rollbackWorker, maxRetries);
92         Integer hashKey = getHashKey(key);
93         LOG.debug("Obtained Hashkey: {}, for jobkey: {}", hashKey, key);
94
95         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
96         synchronized (jobEntriesMap) {
97             JobQueue jobQueue = jobEntriesMap.get(key);
98             if (jobQueue == null) {
99                 jobQueue = new JobQueue();
100             }
101             LOG.trace("Adding jobkey {} to queue {} with size {}", key, hashKey, jobEntriesMap.size());
102             jobQueue.addEntry(jobEntry);
103             jobEntriesMap.put(key, jobQueue);
104
105             DataStoreJobCoordinatorCounters.jobs_pending.inc();
106         }
107         reentrantLock.lock();
108         try {
109             waitCondition.signal();
110         } finally {
111             reentrantLock.unlock();
112         }
113     }
114
115     /**
116      * Cleanup the submitted job from the job queue.
117      **/
118     private void clearJob(JobEntry jobEntry) {
119         Integer hashKey = getHashKey(jobEntry.getKey());
120         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
121         LOG.trace("About to clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
122         synchronized (jobEntriesMap) {
123             JobQueue jobQueue = jobEntriesMap.get(jobEntry.getKey());
124             jobQueue.setExecutingEntry(null);
125             if (jobQueue.getWaitingEntries().isEmpty()) {
126                 LOG.trace("Clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
127                 jobEntriesMap.remove(jobEntry.getKey());
128             }
129         }
130         DataStoreJobCoordinatorCounters.jobs_cleared.inc();
131     }
132
133     /**
134      * Used to generate the hashkey in to the jobQueueMap.
135      */
136     private Integer getHashKey(String key) {
137         int code = key.hashCode();
138         return (code % THREADPOOL_SIZE + THREADPOOL_SIZE) % THREADPOOL_SIZE;
139     }
140
141     /**
142      * JobCallback class is used as a future callback for main and rollback
143      * workers to handle success and failure.
144      */
145     private class JobCallback implements FutureCallback<List<Void>> {
146         private final JobEntry jobEntry;
147
148         JobCallback(JobEntry jobEntry) {
149             this.jobEntry = jobEntry;
150         }
151
152         /**
153          * This implies that all the future instances have returned
154          * success. -- TODO: Confirm this
155          */
156         @Override
157         public void onSuccess(List<Void> voids) {
158             LOG.trace("Job {} completed successfully", jobEntry.getKey());
159             clearJob(jobEntry);
160         }
161
162         /**
163          *    This method is used to handle failure callbacks. If more
164          *    retry needed, the retrycount is decremented and mainworker
165          *    is executed again. After retries completed, rollbackworker
166          *    is executed. If rollbackworker fails, this is a
167          *    double-fault. Double fault is logged and ignored.
168          */
169         @Override
170         public void onFailure(Throwable throwable) {
171             LOG.warn("Job: {} failed with exception: {} {}", jobEntry, throwable.getClass().getSimpleName(),
172                     throwable.getStackTrace());
173             if (jobEntry.getMainWorker() == null) {
174                 LOG.error("Job: {} failed with Double-Fault. Bailing Out.", jobEntry);
175                 clearJob(jobEntry);
176                 return;
177             }
178
179             int retryCount = jobEntry.decrementRetryCountAndGet();
180             if (retryCount > 0) {
181                 long waitTime = (RETRY_WAIT_BASE_TIME * 10) / retryCount;
182                 scheduledExecutorService.schedule(() -> {
183                     MainTask worker = new MainTask(jobEntry);
184                     fjPool.execute(worker);
185                     }, waitTime, TimeUnit.MILLISECONDS);
186                 return;
187             }
188
189             if (jobEntry.getRollbackWorker() != null) {
190                 jobEntry.setMainWorker(null);
191                 RollbackTask rollbackTask = new RollbackTask(jobEntry);
192                 fjPool.execute(rollbackTask);
193                 return;
194             }
195
196             clearJob(jobEntry);
197         }
198     }
199
200     /**
201      * RollbackTask is used to execute the RollbackCallable provided by the
202      * application in the eventuality of a failure.
203      */
204     private class RollbackTask implements Runnable {
205         private final JobEntry jobEntry;
206
207         RollbackTask(JobEntry jobEntry) {
208             this.jobEntry = jobEntry;
209         }
210
211         @Override
212         @SuppressWarnings("checkstyle:illegalcatch")
213         public void run() {
214             RollbackCallable callable = jobEntry.getRollbackWorker();
215             callable.setFutures(jobEntry.getFutures());
216             List<ListenableFuture<Void>> futures = null;
217
218             try {
219                 futures = callable.call();
220             } catch (Exception e) {
221                 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
222             }
223
224             if (futures == null || futures.isEmpty()) {
225                 clearJob(jobEntry);
226                 return;
227             }
228
229             ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
230             Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
231             jobEntry.setFutures(futures);
232         }
233     }
234
235     /**
236      * Execute the MainWorker callable.
237      */
238     private class MainTask implements Runnable {
239         private static final int LONG_JOBS_THRESHOLD = 1000; // MS
240         private final JobEntry jobEntry;
241
242         MainTask(JobEntry jobEntry) {
243             this.jobEntry = jobEntry;
244         }
245
246         @Override
247         @SuppressWarnings("checkstyle:illegalcatch")
248         public void run() {
249             List<ListenableFuture<Void>> futures = null;
250             long jobStartTimestamp = System.currentTimeMillis();
251             LOG.trace("Running job {}", jobEntry.getKey());
252
253             try {
254                 futures = jobEntry.getMainWorker().call();
255                 long jobExecutionTime = System.currentTimeMillis() - jobStartTimestamp;
256                 printJobs(jobEntry.getKey(), jobExecutionTime);
257             } catch (Exception e) {
258                 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
259             }
260
261             if (futures == null || futures.isEmpty()) {
262                 clearJob(jobEntry);
263                 return;
264             }
265
266             ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
267             Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
268             jobEntry.setFutures(futures);
269         }
270
271         private void printJobs(String key, long jobExecutionTime) {
272             if (jobExecutionTime > LONG_JOBS_THRESHOLD) {
273                 LOG.warn("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
274                 return;
275             }
276             LOG.trace("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
277         }
278     }
279
280     private class JobQueueHandler implements Runnable {
281         @Override
282         @SuppressWarnings("checkstyle:illegalcatch")
283         public void run() {
284             LOG.info("Starting JobQueue Handler Thread with pool size {}", THREADPOOL_SIZE);
285             while (true) {
286                 try {
287                     for (int i = 0; i < THREADPOOL_SIZE; i++) {
288                         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
289                         if (jobEntriesMap.isEmpty()) {
290                             continue;
291                         }
292                         LOG.trace("JobQueueHandler handling queue {} with kesy size {}. Keys: {} ", i,
293                                 jobEntriesMap.size(), Arrays.toString(jobEntriesMap.keySet().toArray()));
294
295                         synchronized (jobEntriesMap) {
296                             Iterator<Map.Entry<String, JobQueue>> it = jobEntriesMap.entrySet().iterator();
297                             while (it.hasNext()) {
298                                 Map.Entry<String, JobQueue> entry = it.next();
299                                 if (entry.getValue().getExecutingEntry() != null) {
300                                     continue;
301                                 }
302                                 JobEntry jobEntry = entry.getValue().getWaitingEntries().poll();
303                                 if (jobEntry != null) {
304                                     entry.getValue().setExecutingEntry(jobEntry);
305                                     MainTask worker = new MainTask(jobEntry);
306                                     LOG.trace("Executing job {} from queue {}", jobEntry.getKey(), i);
307                                     fjPool.execute(worker);
308                                     DataStoreJobCoordinatorCounters.jobs_pending.dec();
309
310                                 } else {
311                                     it.remove();
312                                     DataStoreJobCoordinatorCounters.jobs_remove_entry.inc();
313                                 }
314                             }
315                         }
316                     }
317
318                     reentrantLock.lock();
319                     try {
320                         if (isJobQueueEmpty()) {
321                             waitCondition.await();
322                         }
323                     } finally {
324                         reentrantLock.unlock();
325                     }
326                 } catch (Exception e) {
327                     LOG.error("Exception while executing the tasks {} ", e);
328                 } catch (Throwable e) {
329                     LOG.error("Error while executing the tasks {} ", e);
330                 }
331             }
332         }
333     }
334
335     private boolean isJobQueueEmpty() {
336         for (int i = 0; i < THREADPOOL_SIZE; i++) {
337             Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
338             if (!jobEntriesMap.isEmpty()) {
339                 return false;
340             }
341         }
342
343         return true;
344     }
345
346 }