2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.genius.datastoreutils;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.Iterator;
15 import java.util.List;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.ForkJoinPool;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.locks.Condition;
24 import java.util.concurrent.locks.ReentrantLock;
26 import javax.annotation.concurrent.GuardedBy;
27 import org.opendaylight.infrautils.utils.concurrent.LoggingThreadUncaughtExceptionHandler;
28 import org.opendaylight.infrautils.utils.concurrent.ThreadFactoryProvider;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * DataStoreJobCoordinator.
35 * @deprecated Use org.opendaylight.infrautils.jobcoordinator.JobCoordinator
36 * instead of this. Please note that in its new reincarnation its no
37 * longer a static singleton but now an OSGi service which can
38 * (must) {@literal @}Inject into your class using it.
41 public class DataStoreJobCoordinator {
43 private static final Logger LOG = LoggerFactory.getLogger(DataStoreJobCoordinator.class);
45 private static final int THREADPOOL_SIZE = Runtime.getRuntime().availableProcessors();
46 private static final long RETRY_WAIT_BASE_TIME = 100;
48 // package local instead of private for TestDataStoreJobCoordinator
49 final ForkJoinPool fjPool;
50 final Map<Integer, Map<String, JobQueue>> jobQueueMap = new ConcurrentHashMap<>();
52 private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
53 private final ReentrantLock reentrantLock = new ReentrantLock();
54 private final Condition waitCondition = reentrantLock.newCondition();
56 @GuardedBy("reentrantLock")
57 private boolean isJobAvailable = false;
59 private static DataStoreJobCoordinator instance;
62 instance = new DataStoreJobCoordinator();
65 public static DataStoreJobCoordinator getInstance() {
69 private DataStoreJobCoordinator() {
70 fjPool = new ForkJoinPool(
71 Math.min(/* MAX_CAP */ 0x7fff, Runtime.getRuntime().availableProcessors()),
72 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
73 LoggingThreadUncaughtExceptionHandler.toLogger(LOG),
76 for (int i = 0; i < THREADPOOL_SIZE; i++) {
77 Map<String, JobQueue> jobEntriesMap = new ConcurrentHashMap<>();
78 jobQueueMap.put(i, jobEntriesMap);
81 ThreadFactoryProvider.builder()
82 .namePrefix("DataStoreJobCoordinator-JobQueueHandler")
85 .newThread(new JobQueueHandler()).start();
88 /* package local */ void destroy() {
90 scheduledExecutorService.shutdownNow();
93 public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker) {
94 enqueueJob(key, mainWorker, null, 0);
97 public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
98 RollbackCallable rollbackWorker) {
99 enqueueJob(key, mainWorker, rollbackWorker, 0);
102 public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker, int maxRetries) {
103 enqueueJob(key, mainWorker, null, maxRetries);
107 * This is used by the external applications to enqueue a Job
108 * with an appropriate key. A JobEntry is created and queued
111 public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
112 RollbackCallable rollbackWorker, int maxRetries) {
113 JobEntry jobEntry = new JobEntry(key, mainWorker, rollbackWorker, maxRetries);
114 Integer hashKey = getHashKey(key);
115 LOG.debug("Obtained Hashkey: {}, for jobkey: {}", hashKey, key);
117 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
118 synchronized (jobEntriesMap) {
119 JobQueue jobQueue = jobEntriesMap.get(key);
120 if (jobQueue == null) {
121 jobQueue = new JobQueue();
123 if (LOG.isTraceEnabled()) {
124 LOG.trace("Adding jobkey {} to queue {} with size {}", key, hashKey, jobEntriesMap.size());
126 jobQueue.addEntry(jobEntry);
127 jobEntriesMap.put(key, jobQueue);
129 DataStoreJobCoordinatorCounters.jobs_pending.inc();
130 DataStoreJobCoordinatorCounters.jobs_incomplete.inc();
131 DataStoreJobCoordinatorCounters.jobs_created.inc();
136 public long getIncompleteTaskCount() {
137 return DataStoreJobCoordinatorCounters.jobs_incomplete.get();
141 * Cleanup the submitted job from the job queue.
143 private void clearJob(JobEntry jobEntry) {
144 Integer hashKey = getHashKey(jobEntry.getKey());
145 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
146 LOG.trace("About to clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
147 synchronized (jobEntriesMap) {
148 JobQueue jobQueue = jobEntriesMap.get(jobEntry.getKey());
149 jobQueue.setExecutingEntry(null);
150 if (jobQueue.getWaitingEntries().isEmpty()) {
151 LOG.trace("Clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
152 jobEntriesMap.remove(jobEntry.getKey());
155 DataStoreJobCoordinatorCounters.jobs_cleared.inc();
156 DataStoreJobCoordinatorCounters.jobs_incomplete.dec();
161 * Used to generate the hashkey in to the jobQueueMap.
163 private Integer getHashKey(String key) {
164 int code = key.hashCode();
165 return (code % THREADPOOL_SIZE + THREADPOOL_SIZE) % THREADPOOL_SIZE;
169 * JobCallback class is used as a future callback for main and rollback
170 * workers to handle success and failure.
172 private class JobCallback implements FutureCallback<List<Void>> {
173 private final JobEntry jobEntry;
175 JobCallback(JobEntry jobEntry) {
176 this.jobEntry = jobEntry;
180 * This implies that all the future instances have returned
181 * success. -- TODO: Confirm this
184 public void onSuccess(List<Void> voids) {
185 LOG.trace("Job {} completed successfully", jobEntry.getKey());
190 * This method is used to handle failure callbacks. If more
191 * retry needed, the retrycount is decremented and mainworker
192 * is executed again. After retries completed, rollbackworker
193 * is executed. If rollbackworker fails, this is a
194 * double-fault. Double fault is logged and ignored.
197 public void onFailure(Throwable throwable) {
198 LOG.warn("Job: {} failed", jobEntry, throwable);
199 if (jobEntry.getMainWorker() == null) {
200 LOG.error("Job: {} failed with Double-Fault. Bailing Out.", jobEntry);
205 int retryCount = jobEntry.decrementRetryCountAndGet();
206 if (retryCount > 0) {
207 long waitTime = RETRY_WAIT_BASE_TIME * 10 / retryCount;
208 scheduledExecutorService.schedule(() -> {
209 MainTask worker = new MainTask(jobEntry);
210 fjPool.execute(worker);
211 }, waitTime, TimeUnit.MILLISECONDS);
215 if (jobEntry.getRollbackWorker() != null) {
216 jobEntry.setMainWorker(null);
217 RollbackTask rollbackTask = new RollbackTask(jobEntry);
218 fjPool.execute(rollbackTask);
227 * RollbackTask is used to execute the RollbackCallable provided by the
228 * application in the eventuality of a failure.
230 private class RollbackTask implements Runnable {
231 private final JobEntry jobEntry;
233 RollbackTask(JobEntry jobEntry) {
234 this.jobEntry = jobEntry;
238 @SuppressWarnings("checkstyle:IllegalCatch")
240 RollbackCallable callable = jobEntry.getRollbackWorker();
241 callable.setFutures(jobEntry.getFutures());
242 List<ListenableFuture<Void>> futures = null;
245 futures = callable.call();
246 } catch (Exception e) {
247 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
250 if (futures == null || futures.isEmpty()) {
255 ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
256 Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
257 jobEntry.setFutures(futures);
262 * Execute the MainWorker callable.
264 private class MainTask implements Runnable {
265 private static final int LONG_JOBS_THRESHOLD = 1000; // MS
266 private final JobEntry jobEntry;
268 MainTask(JobEntry jobEntry) {
269 this.jobEntry = jobEntry;
273 @SuppressWarnings("checkstyle:illegalcatch")
275 List<ListenableFuture<Void>> futures = null;
276 long jobStartTimestamp = System.currentTimeMillis();
277 LOG.trace("Running job {}", jobEntry.getKey());
280 futures = jobEntry.getMainWorker().call();
281 long jobExecutionTime = System.currentTimeMillis() - jobStartTimestamp;
282 printJobs(jobExecutionTime);
283 } catch (Throwable e) {
284 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
287 if (futures == null || futures.isEmpty()) {
292 ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
293 Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
294 jobEntry.setFutures(futures);
297 private void printJobs(long jobExecutionTime) {
298 if (jobExecutionTime > LONG_JOBS_THRESHOLD) {
299 LOG.warn("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
302 LOG.trace("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
306 private class JobQueueHandler implements Runnable {
308 @SuppressWarnings("checkstyle:illegalcatch")
310 LOG.info("Starting JobQueue Handler Thread with pool size {}", THREADPOOL_SIZE);
313 for (int i = 0; i < THREADPOOL_SIZE; i++) {
314 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
315 if (jobEntriesMap.isEmpty()) {
318 if (LOG.isTraceEnabled()) {
319 LOG.trace("JobQueueHandler handling queue {} with key size {}. Keys: {} ", i,
320 jobEntriesMap.size(), jobEntriesMap.keySet());
323 synchronized (jobEntriesMap) {
324 Iterator<Map.Entry<String, JobQueue>> it = jobEntriesMap.entrySet().iterator();
325 while (it.hasNext()) {
326 Map.Entry<String, JobQueue> entry = it.next();
327 JobEntry executingEntry = entry.getValue().getExecutingEntry();
328 if (executingEntry != null) {
329 LOG.trace("Job is under execution {}", executingEntry);
332 JobEntry jobEntry = entry.getValue().getWaitingEntries().poll();
333 if (jobEntry != null) {
334 entry.getValue().setExecutingEntry(jobEntry);
335 MainTask worker = new MainTask(jobEntry);
336 LOG.trace("Executing job {} from queue {}", jobEntry.getKey(), i);
337 fjPool.execute(worker);
338 DataStoreJobCoordinatorCounters.jobs_pending.dec();
346 waitForJobIfNeeded();
347 } catch (Exception e) {
348 LOG.error("Exception while executing the tasks", e);
349 } catch (Throwable e) {
350 LOG.error("Error while executing the tasks", e);
356 private boolean isJobQueueEmpty() {
357 for (int i = 0; i < THREADPOOL_SIZE; i++) {
358 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
359 if (!jobEntriesMap.isEmpty()) {
368 private void signalForNextJob() {
369 reentrantLock.lock();
371 isJobAvailable = true;
372 waitCondition.signalAll();
374 reentrantLock.unlock();
378 private void waitForJobIfNeeded() throws InterruptedException {
379 reentrantLock.lock();
381 while (!isJobAvailable) {
382 waitCondition.await(1, TimeUnit.SECONDS);
384 isJobAvailable = false;
386 reentrantLock.unlock();