2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.genius.datastoreutils;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.Iterator;
15 import java.util.List;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.ForkJoinPool;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.locks.Condition;
24 import java.util.concurrent.locks.ReentrantLock;
25 import org.opendaylight.genius.infra.LoggingThreadUncaughtExceptionHandler;
26 import org.opendaylight.genius.infra.ThreadFactoryProvider;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
31 * DataStoreJobCoordinator.
33 * @deprecated Use org.opendaylight.infrautils.jobcoordinator.JobCoordinator
34 * instead of this. Please note that in its new reincarnation its no
35 * longer a static singleton but now an OSGi service which can
36 * (must) {@literal @}Inject into your class using it.
39 public class DataStoreJobCoordinator {
41 private static final Logger LOG = LoggerFactory.getLogger(DataStoreJobCoordinator.class);
43 private static final int THREADPOOL_SIZE = Runtime.getRuntime().availableProcessors();
44 private static final long RETRY_WAIT_BASE_TIME = 100;
46 // package local instead of private for TestDataStoreJobCoordinator
47 final ForkJoinPool fjPool;
48 final Map<Integer, Map<String, JobQueue>> jobQueueMap = new ConcurrentHashMap<>();
50 private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
51 private final ReentrantLock reentrantLock = new ReentrantLock();
52 private final Condition waitCondition = reentrantLock.newCondition();
54 private static DataStoreJobCoordinator instance;
57 instance = new DataStoreJobCoordinator();
60 public static DataStoreJobCoordinator getInstance() {
64 private DataStoreJobCoordinator() {
65 fjPool = new ForkJoinPool(
66 Math.min(/* MAX_CAP */ 0x7fff, Runtime.getRuntime().availableProcessors()),
67 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
68 LoggingThreadUncaughtExceptionHandler.toLOG(LOG),
71 for (int i = 0; i < THREADPOOL_SIZE; i++) {
72 Map<String, JobQueue> jobEntriesMap = new ConcurrentHashMap<>();
73 jobQueueMap.put(i, jobEntriesMap);
76 ThreadFactoryProvider.builder()
77 .namePrefix("DataStoreJobCoordinator-JobQueueHandler")
80 .newThread(new JobQueueHandler()).start();
83 public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker) {
84 enqueueJob(key, mainWorker, null, 0);
87 public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
88 RollbackCallable rollbackWorker) {
89 enqueueJob(key, mainWorker, rollbackWorker, 0);
92 public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker, int maxRetries) {
93 enqueueJob(key, mainWorker, null, maxRetries);
96 public void enqueueJob(AbstractDataStoreJob job) throws InvalidJobException {
98 enqueueJob(job.getJobQueueKey(), job);
102 * This is used by the external applications to enqueue a Job
103 * with an appropriate key. A JobEntry is created and queued
106 public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
107 RollbackCallable rollbackWorker, int maxRetries) {
108 JobEntry jobEntry = new JobEntry(key, mainWorker, rollbackWorker, maxRetries);
109 Integer hashKey = getHashKey(key);
110 LOG.debug("Obtained Hashkey: {}, for jobkey: {}", hashKey, key);
112 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
113 synchronized (jobEntriesMap) {
114 JobQueue jobQueue = jobEntriesMap.get(key);
115 if (jobQueue == null) {
116 jobQueue = new JobQueue();
118 LOG.trace("Adding jobkey {} to queue {} with size {}", key, hashKey, jobEntriesMap.size());
119 jobQueue.addEntry(jobEntry);
120 jobEntriesMap.put(key, jobQueue);
122 DataStoreJobCoordinatorCounters.jobs_pending.inc();
123 DataStoreJobCoordinatorCounters.jobs_incomplete.inc();
124 DataStoreJobCoordinatorCounters.jobs_created.inc();
126 reentrantLock.lock();
128 waitCondition.signal();
130 reentrantLock.unlock();
134 public long getIncompleteTaskCount() {
135 return DataStoreJobCoordinatorCounters.jobs_incomplete.get();
139 * Cleanup the submitted job from the job queue.
141 private void clearJob(JobEntry jobEntry) {
142 Integer hashKey = getHashKey(jobEntry.getKey());
143 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
144 LOG.trace("About to clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
145 synchronized (jobEntriesMap) {
146 JobQueue jobQueue = jobEntriesMap.get(jobEntry.getKey());
147 jobQueue.setExecutingEntry(null);
148 if (jobQueue.getWaitingEntries().isEmpty()) {
149 LOG.trace("Clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
150 jobEntriesMap.remove(jobEntry.getKey());
153 DataStoreJobCoordinatorCounters.jobs_cleared.inc();
154 DataStoreJobCoordinatorCounters.jobs_incomplete.dec();
158 * Used to generate the hashkey in to the jobQueueMap.
160 private Integer getHashKey(String key) {
161 int code = key.hashCode();
162 return (code % THREADPOOL_SIZE + THREADPOOL_SIZE) % THREADPOOL_SIZE;
166 * JobCallback class is used as a future callback for main and rollback
167 * workers to handle success and failure.
169 private class JobCallback implements FutureCallback<List<Void>> {
170 private final JobEntry jobEntry;
172 JobCallback(JobEntry jobEntry) {
173 this.jobEntry = jobEntry;
177 * This implies that all the future instances have returned
178 * success. -- TODO: Confirm this
181 public void onSuccess(List<Void> voids) {
182 LOG.trace("Job {} completed successfully", jobEntry.getKey());
187 * This method is used to handle failure callbacks. If more
188 * retry needed, the retrycount is decremented and mainworker
189 * is executed again. After retries completed, rollbackworker
190 * is executed. If rollbackworker fails, this is a
191 * double-fault. Double fault is logged and ignored.
194 public void onFailure(Throwable throwable) {
195 LOG.warn("Job: {} failed", jobEntry, throwable);
196 if (jobEntry.getMainWorker() == null) {
197 LOG.error("Job: {} failed with Double-Fault. Bailing Out.", jobEntry);
202 int retryCount = jobEntry.decrementRetryCountAndGet();
203 if (retryCount > 0) {
204 long waitTime = RETRY_WAIT_BASE_TIME * 10 / retryCount;
205 scheduledExecutorService.schedule(() -> {
206 MainTask worker = new MainTask(jobEntry);
207 fjPool.execute(worker);
208 }, waitTime, TimeUnit.MILLISECONDS);
212 if (jobEntry.getRollbackWorker() != null) {
213 jobEntry.setMainWorker(null);
214 RollbackTask rollbackTask = new RollbackTask(jobEntry);
215 fjPool.execute(rollbackTask);
224 * RollbackTask is used to execute the RollbackCallable provided by the
225 * application in the eventuality of a failure.
227 private class RollbackTask implements Runnable {
228 private final JobEntry jobEntry;
230 RollbackTask(JobEntry jobEntry) {
231 this.jobEntry = jobEntry;
235 @SuppressWarnings("checkstyle:IllegalCatch")
237 RollbackCallable callable = jobEntry.getRollbackWorker();
238 callable.setFutures(jobEntry.getFutures());
239 List<ListenableFuture<Void>> futures = null;
242 futures = callable.call();
243 } catch (Exception e) {
244 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
247 if (futures == null || futures.isEmpty()) {
252 ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
253 Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
254 jobEntry.setFutures(futures);
259 * Execute the MainWorker callable.
261 private class MainTask implements Runnable {
262 private static final int LONG_JOBS_THRESHOLD = 1000; // MS
263 private final JobEntry jobEntry;
265 MainTask(JobEntry jobEntry) {
266 this.jobEntry = jobEntry;
270 @SuppressWarnings("checkstyle:illegalcatch")
272 List<ListenableFuture<Void>> futures = null;
273 long jobStartTimestamp = System.currentTimeMillis();
274 LOG.trace("Running job {}", jobEntry.getKey());
277 futures = jobEntry.getMainWorker().call();
278 long jobExecutionTime = System.currentTimeMillis() - jobStartTimestamp;
279 printJobs(jobEntry.getKey(), jobExecutionTime);
280 } catch (Exception e) {
281 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
284 if (futures == null || futures.isEmpty()) {
289 ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
290 Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
291 jobEntry.setFutures(futures);
294 private void printJobs(String key, long jobExecutionTime) {
295 if (jobExecutionTime > LONG_JOBS_THRESHOLD) {
296 LOG.warn("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
299 LOG.trace("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
303 private class JobQueueHandler implements Runnable {
305 @SuppressWarnings("checkstyle:illegalcatch")
307 LOG.info("Starting JobQueue Handler Thread with pool size {}", THREADPOOL_SIZE);
310 for (int i = 0; i < THREADPOOL_SIZE; i++) {
311 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
312 if (jobEntriesMap.isEmpty()) {
315 LOG.trace("JobQueueHandler handling queue {} with kesy size {}. Keys: {} ", i,
316 jobEntriesMap.size(), jobEntriesMap.keySet());
318 synchronized (jobEntriesMap) {
319 Iterator<Map.Entry<String, JobQueue>> it = jobEntriesMap.entrySet().iterator();
320 while (it.hasNext()) {
321 Map.Entry<String, JobQueue> entry = it.next();
322 if (entry.getValue().getExecutingEntry() != null) {
325 JobEntry jobEntry = entry.getValue().getWaitingEntries().poll();
326 if (jobEntry != null) {
327 entry.getValue().setExecutingEntry(jobEntry);
328 MainTask worker = new MainTask(jobEntry);
329 LOG.trace("Executing job {} from queue {}", jobEntry.getKey(), i);
330 fjPool.execute(worker);
331 DataStoreJobCoordinatorCounters.jobs_pending.dec();
340 reentrantLock.lock();
342 if (isJobQueueEmpty()) {
343 waitCondition.await();
346 reentrantLock.unlock();
348 } catch (Exception e) {
349 LOG.error("Exception while executing the tasks", e);
350 } catch (Throwable e) {
351 LOG.error("Error while executing the tasks", e);
357 private boolean isJobQueueEmpty() {
358 for (int i = 0; i < THREADPOOL_SIZE; i++) {
359 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
360 if (!jobEntriesMap.isEmpty()) {