Merge "Bug 8111: Conflicting modification for path"
[genius.git] / mdsalutil / mdsalutil-api / src / main / java / org / opendaylight / genius / datastoreutils / DataStoreJobCoordinator.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.genius.datastoreutils;
10
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.Iterator;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.ForkJoinPool;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.locks.Condition;
24 import java.util.concurrent.locks.ReentrantLock;
25 import org.opendaylight.infrautils.utils.concurrent.LoggingThreadUncaughtExceptionHandler;
26 import org.opendaylight.infrautils.utils.concurrent.ThreadFactoryProvider;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * DataStoreJobCoordinator.
32  *
33  * @deprecated Use org.opendaylight.infrautils.jobcoordinator.JobCoordinator
34  *             instead of this. Please note that in its new reincarnation its no
35  *             longer a static singleton but now an OSGi service which can
36  *             (must) {@literal @}Inject into your class using it.
37  */
38 @Deprecated
39 public class DataStoreJobCoordinator {
40
41     private static final Logger LOG = LoggerFactory.getLogger(DataStoreJobCoordinator.class);
42
43     private static final int THREADPOOL_SIZE = Runtime.getRuntime().availableProcessors();
44     private static final long RETRY_WAIT_BASE_TIME = 100;
45
46     // package local instead of private for TestDataStoreJobCoordinator
47     final ForkJoinPool fjPool;
48     final Map<Integer, Map<String, JobQueue>> jobQueueMap = new ConcurrentHashMap<>();
49
50     private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
51     private final ReentrantLock reentrantLock = new ReentrantLock();
52     private final Condition waitCondition = reentrantLock.newCondition();
53
54     private static DataStoreJobCoordinator instance;
55
56     static {
57         instance = new DataStoreJobCoordinator();
58     }
59
60     public static DataStoreJobCoordinator getInstance() {
61         return instance;
62     }
63
64     private DataStoreJobCoordinator() {
65         fjPool = new ForkJoinPool(
66                 Math.min(/* MAX_CAP */ 0x7fff, Runtime.getRuntime().availableProcessors()),
67                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
68                 LoggingThreadUncaughtExceptionHandler.toLogger(LOG),
69                 false);
70
71         for (int i = 0; i < THREADPOOL_SIZE; i++) {
72             Map<String, JobQueue> jobEntriesMap = new ConcurrentHashMap<>();
73             jobQueueMap.put(i, jobEntriesMap);
74         }
75
76         ThreadFactoryProvider.builder()
77             .namePrefix("DataStoreJobCoordinator-JobQueueHandler")
78             .logger(LOG)
79             .build().get()
80         .newThread(new JobQueueHandler()).start();
81     }
82
83     /* package local */ void destroy() {
84         fjPool.shutdownNow();
85         scheduledExecutorService.shutdownNow();
86     }
87
88     public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker) {
89         enqueueJob(key, mainWorker, null, 0);
90     }
91
92     public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
93             RollbackCallable rollbackWorker) {
94         enqueueJob(key, mainWorker, rollbackWorker, 0);
95     }
96
97     public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker, int maxRetries) {
98         enqueueJob(key, mainWorker, null, maxRetries);
99     }
100
101     public void enqueueJob(AbstractDataStoreJob job) throws InvalidJobException {
102         job.validate();
103         enqueueJob(job.getJobQueueKey(), job);
104     }
105
106     /**
107      *    This is used by the external applications to enqueue a Job
108      *    with an appropriate key. A JobEntry is created and queued
109      *    appropriately.
110      */
111     public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
112                            RollbackCallable rollbackWorker, int maxRetries) {
113         JobEntry jobEntry = new JobEntry(key, mainWorker, rollbackWorker, maxRetries);
114         Integer hashKey = getHashKey(key);
115         LOG.debug("Obtained Hashkey: {}, for jobkey: {}", hashKey, key);
116
117         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
118         synchronized (jobEntriesMap) {
119             JobQueue jobQueue = jobEntriesMap.get(key);
120             if (jobQueue == null) {
121                 jobQueue = new JobQueue();
122             }
123             LOG.trace("Adding jobkey {} to queue {} with size {}", key, hashKey, jobEntriesMap.size());
124             jobQueue.addEntry(jobEntry);
125             jobEntriesMap.put(key, jobQueue);
126
127             DataStoreJobCoordinatorCounters.jobs_pending.inc();
128             DataStoreJobCoordinatorCounters.jobs_incomplete.inc();
129             DataStoreJobCoordinatorCounters.jobs_created.inc();
130         }
131         reentrantLock.lock();
132         try {
133             waitCondition.signal();
134         } finally {
135             reentrantLock.unlock();
136         }
137     }
138
139     public long getIncompleteTaskCount() {
140         return DataStoreJobCoordinatorCounters.jobs_incomplete.get();
141     }
142
143     /**
144      * Cleanup the submitted job from the job queue.
145      **/
146     private void clearJob(JobEntry jobEntry) {
147         Integer hashKey = getHashKey(jobEntry.getKey());
148         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
149         LOG.trace("About to clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
150         synchronized (jobEntriesMap) {
151             JobQueue jobQueue = jobEntriesMap.get(jobEntry.getKey());
152             jobQueue.setExecutingEntry(null);
153             if (jobQueue.getWaitingEntries().isEmpty()) {
154                 LOG.trace("Clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
155                 jobEntriesMap.remove(jobEntry.getKey());
156             }
157         }
158         DataStoreJobCoordinatorCounters.jobs_cleared.inc();
159         DataStoreJobCoordinatorCounters.jobs_incomplete.dec();
160     }
161
162     /**
163      * Used to generate the hashkey in to the jobQueueMap.
164      */
165     private Integer getHashKey(String key) {
166         int code = key.hashCode();
167         return (code % THREADPOOL_SIZE + THREADPOOL_SIZE) % THREADPOOL_SIZE;
168     }
169
170     /**
171      * JobCallback class is used as a future callback for main and rollback
172      * workers to handle success and failure.
173      */
174     private class JobCallback implements FutureCallback<List<Void>> {
175         private final JobEntry jobEntry;
176
177         JobCallback(JobEntry jobEntry) {
178             this.jobEntry = jobEntry;
179         }
180
181         /**
182          * This implies that all the future instances have returned
183          * success. -- TODO: Confirm this
184          */
185         @Override
186         public void onSuccess(List<Void> voids) {
187             LOG.trace("Job {} completed successfully", jobEntry.getKey());
188             clearJob(jobEntry);
189         }
190
191         /**
192          * This method is used to handle failure callbacks. If more
193          * retry needed, the retrycount is decremented and mainworker
194          * is executed again. After retries completed, rollbackworker
195          * is executed. If rollbackworker fails, this is a
196          * double-fault. Double fault is logged and ignored.
197          */
198         @Override
199         public void onFailure(Throwable throwable) {
200             LOG.warn("Job: {} failed", jobEntry, throwable);
201             if (jobEntry.getMainWorker() == null) {
202                 LOG.error("Job: {} failed with Double-Fault. Bailing Out.", jobEntry);
203                 clearJob(jobEntry);
204                 return;
205             }
206
207             int retryCount = jobEntry.decrementRetryCountAndGet();
208             if (retryCount > 0) {
209                 long waitTime = RETRY_WAIT_BASE_TIME * 10 / retryCount;
210                 scheduledExecutorService.schedule(() -> {
211                     MainTask worker = new MainTask(jobEntry);
212                     fjPool.execute(worker);
213                 }, waitTime, TimeUnit.MILLISECONDS);
214                 return;
215             }
216
217             if (jobEntry.getRollbackWorker() != null) {
218                 jobEntry.setMainWorker(null);
219                 RollbackTask rollbackTask = new RollbackTask(jobEntry);
220                 fjPool.execute(rollbackTask);
221                 return;
222             }
223
224             clearJob(jobEntry);
225         }
226     }
227
228     /**
229      * RollbackTask is used to execute the RollbackCallable provided by the
230      * application in the eventuality of a failure.
231      */
232     private class RollbackTask implements Runnable {
233         private final JobEntry jobEntry;
234
235         RollbackTask(JobEntry jobEntry) {
236             this.jobEntry = jobEntry;
237         }
238
239         @Override
240         @SuppressWarnings("checkstyle:IllegalCatch")
241         public void run() {
242             RollbackCallable callable = jobEntry.getRollbackWorker();
243             callable.setFutures(jobEntry.getFutures());
244             List<ListenableFuture<Void>> futures = null;
245
246             try {
247                 futures = callable.call();
248             } catch (Exception e) {
249                 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
250             }
251
252             if (futures == null || futures.isEmpty()) {
253                 clearJob(jobEntry);
254                 return;
255             }
256
257             ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
258             Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
259             jobEntry.setFutures(futures);
260         }
261     }
262
263     /**
264      * Execute the MainWorker callable.
265      */
266     private class MainTask implements Runnable {
267         private static final int LONG_JOBS_THRESHOLD = 1000; // MS
268         private final JobEntry jobEntry;
269
270         MainTask(JobEntry jobEntry) {
271             this.jobEntry = jobEntry;
272         }
273
274         @Override
275         @SuppressWarnings("checkstyle:illegalcatch")
276         public void run() {
277             List<ListenableFuture<Void>> futures = null;
278             long jobStartTimestamp = System.currentTimeMillis();
279             LOG.trace("Running job {}", jobEntry.getKey());
280
281             try {
282                 futures = jobEntry.getMainWorker().call();
283                 long jobExecutionTime = System.currentTimeMillis() - jobStartTimestamp;
284                 printJobs(jobEntry.getKey(), jobExecutionTime);
285             } catch (Exception e) {
286                 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
287             }
288
289             if (futures == null || futures.isEmpty()) {
290                 clearJob(jobEntry);
291                 return;
292             }
293
294             ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
295             Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
296             jobEntry.setFutures(futures);
297         }
298
299         private void printJobs(String key, long jobExecutionTime) {
300             if (jobExecutionTime > LONG_JOBS_THRESHOLD) {
301                 LOG.warn("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
302                 return;
303             }
304             LOG.trace("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
305         }
306     }
307
308     private class JobQueueHandler implements Runnable {
309         @Override
310         @SuppressWarnings("checkstyle:illegalcatch")
311         public void run() {
312             LOG.info("Starting JobQueue Handler Thread with pool size {}", THREADPOOL_SIZE);
313             while (true) {
314                 try {
315                     for (int i = 0; i < THREADPOOL_SIZE; i++) {
316                         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
317                         if (jobEntriesMap.isEmpty()) {
318                             continue;
319                         }
320                         LOG.trace("JobQueueHandler handling queue {} with kesy size {}. Keys: {} ", i,
321                                 jobEntriesMap.size(), jobEntriesMap.keySet());
322
323                         synchronized (jobEntriesMap) {
324                             Iterator<Map.Entry<String, JobQueue>> it = jobEntriesMap.entrySet().iterator();
325                             while (it.hasNext()) {
326                                 Map.Entry<String, JobQueue> entry = it.next();
327                                 if (entry.getValue().getExecutingEntry() != null) {
328                                     continue;
329                                 }
330                                 JobEntry jobEntry = entry.getValue().getWaitingEntries().poll();
331                                 if (jobEntry != null) {
332                                     entry.getValue().setExecutingEntry(jobEntry);
333                                     MainTask worker = new MainTask(jobEntry);
334                                     LOG.trace("Executing job {} from queue {}", jobEntry.getKey(), i);
335                                     fjPool.execute(worker);
336                                     DataStoreJobCoordinatorCounters.jobs_pending.dec();
337
338                                 } else {
339                                     it.remove();
340                                 }
341                             }
342                         }
343                     }
344
345                     reentrantLock.lock();
346                     try {
347                         if (isJobQueueEmpty()) {
348                             waitCondition.await();
349                         }
350                     } finally {
351                         reentrantLock.unlock();
352                     }
353                 } catch (Exception e) {
354                     LOG.error("Exception while executing the tasks", e);
355                 } catch (Throwable e) {
356                     LOG.error("Error while executing the tasks", e);
357                 }
358             }
359         }
360     }
361
362     private boolean isJobQueueEmpty() {
363         for (int i = 0; i < THREADPOOL_SIZE; i++) {
364             Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
365             if (!jobEntriesMap.isEmpty()) {
366                 return false;
367             }
368         }
369
370         return true;
371     }
372
373 }