Add a basic framework to create DataTreeChangeListeners
[genius.git] / mdsalutil / mdsalutil-api / src / main / java / org / opendaylight / genius / datastoreutils / DataStoreJobCoordinator.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.genius.datastoreutils;
10
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.Iterator;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.ForkJoinPool;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.locks.Condition;
24 import java.util.concurrent.locks.ReentrantLock;
25
26 import javax.annotation.concurrent.GuardedBy;
27 import org.opendaylight.infrautils.utils.concurrent.LoggingThreadUncaughtExceptionHandler;
28 import org.opendaylight.infrautils.utils.concurrent.ThreadFactoryProvider;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * DataStoreJobCoordinator.
34  *
35  * @deprecated Use org.opendaylight.infrautils.jobcoordinator.JobCoordinator
36  *             instead of this. Please note that in its new reincarnation its no
37  *             longer a static singleton but now an OSGi service which can
38  *             (must) {@literal @}Inject into your class using it.
39  */
40 @Deprecated
41 public class DataStoreJobCoordinator {
42
43     private static final Logger LOG = LoggerFactory.getLogger(DataStoreJobCoordinator.class);
44
45     private static final int THREADPOOL_SIZE = Runtime.getRuntime().availableProcessors();
46     private static final long RETRY_WAIT_BASE_TIME = 100;
47
48     // package local instead of private for TestDataStoreJobCoordinator
49     final ForkJoinPool fjPool;
50     final Map<Integer, Map<String, JobQueue>> jobQueueMap = new ConcurrentHashMap<>();
51
52     private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
53     private final ReentrantLock reentrantLock = new ReentrantLock();
54     private final Condition waitCondition = reentrantLock.newCondition();
55
56     @GuardedBy("reentrantLock")
57     private boolean isJobAvailable = false;
58
59     private static DataStoreJobCoordinator instance;
60
61     static {
62         instance = new DataStoreJobCoordinator();
63     }
64
65     public static DataStoreJobCoordinator getInstance() {
66         return instance;
67     }
68
69     private DataStoreJobCoordinator() {
70         fjPool = new ForkJoinPool(
71                 Math.min(/* MAX_CAP */ 0x7fff, Runtime.getRuntime().availableProcessors()),
72                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
73                 LoggingThreadUncaughtExceptionHandler.toLogger(LOG),
74                 false);
75
76         for (int i = 0; i < THREADPOOL_SIZE; i++) {
77             Map<String, JobQueue> jobEntriesMap = new ConcurrentHashMap<>();
78             jobQueueMap.put(i, jobEntriesMap);
79         }
80
81         ThreadFactoryProvider.builder()
82             .namePrefix("DataStoreJobCoordinator-JobQueueHandler")
83             .logger(LOG)
84             .build().get()
85         .newThread(new JobQueueHandler()).start();
86     }
87
88     /* package local */ void destroy() {
89         fjPool.shutdownNow();
90         scheduledExecutorService.shutdownNow();
91     }
92
93     public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker) {
94         enqueueJob(key, mainWorker, null, 0);
95     }
96
97     public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
98             RollbackCallable rollbackWorker) {
99         enqueueJob(key, mainWorker, rollbackWorker, 0);
100     }
101
102     public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker, int maxRetries) {
103         enqueueJob(key, mainWorker, null, maxRetries);
104     }
105
106     /**
107      *    This is used by the external applications to enqueue a Job
108      *    with an appropriate key. A JobEntry is created and queued
109      *    appropriately.
110      */
111     public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
112                            RollbackCallable rollbackWorker, int maxRetries) {
113         JobEntry jobEntry = new JobEntry(key, mainWorker, rollbackWorker, maxRetries);
114         Integer hashKey = getHashKey(key);
115         LOG.debug("Obtained Hashkey: {}, for jobkey: {}", hashKey, key);
116
117         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
118         synchronized (jobEntriesMap) {
119             JobQueue jobQueue = jobEntriesMap.get(key);
120             if (jobQueue == null) {
121                 jobQueue = new JobQueue();
122             }
123             if (LOG.isTraceEnabled()) {
124                 LOG.trace("Adding jobkey {} to queue {} with size {}", key, hashKey, jobEntriesMap.size());
125             }
126             jobQueue.addEntry(jobEntry);
127             jobEntriesMap.put(key, jobQueue);
128
129             DataStoreJobCoordinatorCounters.jobs_pending.inc();
130             DataStoreJobCoordinatorCounters.jobs_incomplete.inc();
131             DataStoreJobCoordinatorCounters.jobs_created.inc();
132         }
133         signalForNextJob();
134     }
135
136     public long getIncompleteTaskCount() {
137         return DataStoreJobCoordinatorCounters.jobs_incomplete.get();
138     }
139
140     /**
141      * Cleanup the submitted job from the job queue.
142      **/
143     private void clearJob(JobEntry jobEntry) {
144         Integer hashKey = getHashKey(jobEntry.getKey());
145         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
146         LOG.trace("About to clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
147         synchronized (jobEntriesMap) {
148             JobQueue jobQueue = jobEntriesMap.get(jobEntry.getKey());
149             jobQueue.setExecutingEntry(null);
150             if (jobQueue.getWaitingEntries().isEmpty()) {
151                 LOG.trace("Clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
152                 jobEntriesMap.remove(jobEntry.getKey());
153             }
154         }
155         DataStoreJobCoordinatorCounters.jobs_cleared.inc();
156         DataStoreJobCoordinatorCounters.jobs_incomplete.dec();
157         signalForNextJob();
158     }
159
160     /**
161      * Used to generate the hashkey in to the jobQueueMap.
162      */
163     private Integer getHashKey(String key) {
164         int code = key.hashCode();
165         return (code % THREADPOOL_SIZE + THREADPOOL_SIZE) % THREADPOOL_SIZE;
166     }
167
168     /**
169      * JobCallback class is used as a future callback for main and rollback
170      * workers to handle success and failure.
171      */
172     private class JobCallback implements FutureCallback<List<Void>> {
173         private final JobEntry jobEntry;
174
175         JobCallback(JobEntry jobEntry) {
176             this.jobEntry = jobEntry;
177         }
178
179         /**
180          * This implies that all the future instances have returned
181          * success. -- TODO: Confirm this
182          */
183         @Override
184         public void onSuccess(List<Void> voids) {
185             LOG.trace("Job {} completed successfully", jobEntry.getKey());
186             clearJob(jobEntry);
187         }
188
189         /**
190          * This method is used to handle failure callbacks. If more
191          * retry needed, the retrycount is decremented and mainworker
192          * is executed again. After retries completed, rollbackworker
193          * is executed. If rollbackworker fails, this is a
194          * double-fault. Double fault is logged and ignored.
195          */
196         @Override
197         public void onFailure(Throwable throwable) {
198             LOG.warn("Job: {} failed", jobEntry, throwable);
199             if (jobEntry.getMainWorker() == null) {
200                 LOG.error("Job: {} failed with Double-Fault. Bailing Out.", jobEntry);
201                 clearJob(jobEntry);
202                 return;
203             }
204
205             int retryCount = jobEntry.decrementRetryCountAndGet();
206             if (retryCount > 0) {
207                 long waitTime = RETRY_WAIT_BASE_TIME * 10 / retryCount;
208                 scheduledExecutorService.schedule(() -> {
209                     MainTask worker = new MainTask(jobEntry);
210                     fjPool.execute(worker);
211                 }, waitTime, TimeUnit.MILLISECONDS);
212                 return;
213             }
214
215             if (jobEntry.getRollbackWorker() != null) {
216                 jobEntry.setMainWorker(null);
217                 RollbackTask rollbackTask = new RollbackTask(jobEntry);
218                 fjPool.execute(rollbackTask);
219                 return;
220             }
221
222             clearJob(jobEntry);
223         }
224     }
225
226     /**
227      * RollbackTask is used to execute the RollbackCallable provided by the
228      * application in the eventuality of a failure.
229      */
230     private class RollbackTask implements Runnable {
231         private final JobEntry jobEntry;
232
233         RollbackTask(JobEntry jobEntry) {
234             this.jobEntry = jobEntry;
235         }
236
237         @Override
238         @SuppressWarnings("checkstyle:IllegalCatch")
239         public void run() {
240             RollbackCallable callable = jobEntry.getRollbackWorker();
241             callable.setFutures(jobEntry.getFutures());
242             List<ListenableFuture<Void>> futures = null;
243
244             try {
245                 futures = callable.call();
246             } catch (Exception e) {
247                 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
248             }
249
250             if (futures == null || futures.isEmpty()) {
251                 clearJob(jobEntry);
252                 return;
253             }
254
255             ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
256             Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
257             jobEntry.setFutures(futures);
258         }
259     }
260
261     /**
262      * Execute the MainWorker callable.
263      */
264     private class MainTask implements Runnable {
265         private static final int LONG_JOBS_THRESHOLD = 1000; // MS
266         private final JobEntry jobEntry;
267
268         MainTask(JobEntry jobEntry) {
269             this.jobEntry = jobEntry;
270         }
271
272         @Override
273         @SuppressWarnings("checkstyle:illegalcatch")
274         public void run() {
275             List<ListenableFuture<Void>> futures = null;
276             long jobStartTimestamp = System.currentTimeMillis();
277             LOG.trace("Running job {}", jobEntry.getKey());
278
279             try {
280                 futures = jobEntry.getMainWorker().call();
281                 long jobExecutionTime = System.currentTimeMillis() - jobStartTimestamp;
282                 printJobs(jobExecutionTime);
283             } catch (Throwable e) {
284                 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
285             }
286
287             if (futures == null || futures.isEmpty()) {
288                 clearJob(jobEntry);
289                 return;
290             }
291
292             ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
293             Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
294             jobEntry.setFutures(futures);
295         }
296
297         private void printJobs(long jobExecutionTime) {
298             if (jobExecutionTime > LONG_JOBS_THRESHOLD) {
299                 LOG.warn("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
300                 return;
301             }
302             LOG.trace("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
303         }
304     }
305
306     private class JobQueueHandler implements Runnable {
307         @Override
308         @SuppressWarnings("checkstyle:illegalcatch")
309         public void run() {
310             LOG.info("Starting JobQueue Handler Thread with pool size {}", THREADPOOL_SIZE);
311             while (true) {
312                 try {
313                     for (int i = 0; i < THREADPOOL_SIZE; i++) {
314                         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
315                         if (jobEntriesMap.isEmpty()) {
316                             continue;
317                         }
318                         if (LOG.isTraceEnabled()) {
319                             LOG.trace("JobQueueHandler handling queue {} with key size {}. Keys: {} ", i,
320                                 jobEntriesMap.size(), jobEntriesMap.keySet());
321                         }
322
323                         synchronized (jobEntriesMap) {
324                             Iterator<Map.Entry<String, JobQueue>> it = jobEntriesMap.entrySet().iterator();
325                             while (it.hasNext()) {
326                                 Map.Entry<String, JobQueue> entry = it.next();
327                                 JobEntry executingEntry = entry.getValue().getExecutingEntry();
328                                 if (executingEntry != null) {
329                                     LOG.trace("Job is under execution {}", executingEntry);
330                                     continue;
331                                 }
332                                 JobEntry jobEntry = entry.getValue().getWaitingEntries().poll();
333                                 if (jobEntry != null) {
334                                     entry.getValue().setExecutingEntry(jobEntry);
335                                     MainTask worker = new MainTask(jobEntry);
336                                     LOG.trace("Executing job {} from queue {}", jobEntry.getKey(), i);
337                                     fjPool.execute(worker);
338                                     DataStoreJobCoordinatorCounters.jobs_pending.dec();
339
340                                 } else {
341                                     it.remove();
342                                 }
343                             }
344                         }
345                     }
346                     waitForJobIfNeeded();
347                 } catch (Exception e) {
348                     LOG.error("Exception while executing the tasks", e);
349                 } catch (Throwable e) {
350                     LOG.error("Error while executing the tasks", e);
351                 }
352             }
353         }
354     }
355
356     private boolean isJobQueueEmpty() {
357         for (int i = 0; i < THREADPOOL_SIZE; i++) {
358             Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
359             if (!jobEntriesMap.isEmpty()) {
360                 return false;
361             }
362         }
363
364         return true;
365     }
366
367
368     private void signalForNextJob() {
369         reentrantLock.lock();
370         try {
371             isJobAvailable = true;
372             waitCondition.signalAll();
373         } finally {
374             reentrantLock.unlock();
375         }
376     }
377
378     private void waitForJobIfNeeded() throws InterruptedException {
379         reentrantLock.lock();
380         try {
381             while (!isJobAvailable) {
382                 waitCondition.await(1, TimeUnit.SECONDS);
383             }
384             isJobAvailable = false;
385         } finally {
386             reentrantLock.unlock();
387         }
388     }
389 }