2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.genius.datastoreutils;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.Iterator;
15 import java.util.List;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.ForkJoinPool;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.locks.Condition;
24 import java.util.concurrent.locks.ReentrantLock;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
28 public class DataStoreJobCoordinator {
30 private static final Logger LOG = LoggerFactory.getLogger(DataStoreJobCoordinator.class);
32 private static final int THREADPOOL_SIZE = Runtime.getRuntime().availableProcessors();
33 private static final long RETRY_WAIT_BASE_TIME = 100;
35 // package local instead of private for TestDataStoreJobCoordinator
36 final ForkJoinPool fjPool;
37 final Map<Integer, Map<String, JobQueue>> jobQueueMap = new ConcurrentHashMap<>();
39 private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
40 private final ReentrantLock reentrantLock = new ReentrantLock();
41 private final Condition waitCondition = reentrantLock.newCondition();
43 private static DataStoreJobCoordinator instance;
46 instance = new DataStoreJobCoordinator();
49 public static DataStoreJobCoordinator getInstance() {
53 private DataStoreJobCoordinator() {
54 fjPool = new ForkJoinPool();
56 for (int i = 0; i < THREADPOOL_SIZE; i++) {
57 Map<String, JobQueue> jobEntriesMap = new ConcurrentHashMap<>();
58 jobQueueMap.put(i, jobEntriesMap);
61 new Thread(new JobQueueHandler()).start();
64 public void enqueueJob(String key,
65 Callable<List<ListenableFuture<Void>>> mainWorker) {
66 enqueueJob(key, mainWorker, null, 0);
69 public void enqueueJob(String key,
70 Callable<List<ListenableFuture<Void>>> mainWorker,
71 RollbackCallable rollbackWorker) {
72 enqueueJob(key, mainWorker, rollbackWorker, 0);
75 public void enqueueJob(String key,
76 Callable<List<ListenableFuture<Void>>> mainWorker,
78 enqueueJob(key, mainWorker, null, maxRetries);
81 public void enqueueJob(AbstractDataStoreJob job) throws InvalidJobException {
83 enqueueJob(job.getJobQueueKey(), job);
87 * Enqueue a Job with an appropriate key.
88 * A JobEntry is created and queued appropriately.
90 public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
91 RollbackCallable rollbackWorker, int maxRetries) {
92 JobEntry jobEntry = new JobEntry(key, mainWorker, rollbackWorker, maxRetries);
93 Integer hashKey = getHashKey(key);
94 LOG.debug("Obtained Hashkey: {}, for jobkey: {}", hashKey, key);
96 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
97 synchronized (jobEntriesMap) {
98 JobQueue jobQueue = jobEntriesMap.get(key);
99 if (jobQueue == null) {
100 jobQueue = new JobQueue();
102 LOG.trace("Adding jobkey {} to queue {} with size {}", key, hashKey, jobEntriesMap.size());
103 jobQueue.addEntry(jobEntry);
104 jobEntriesMap.put(key, jobQueue);
106 reentrantLock.lock();
108 waitCondition.signal();
110 reentrantLock.unlock();
115 * Cleanup the submitted job from the job queue.
117 private void clearJob(JobEntry jobEntry) {
118 Integer hashKey = getHashKey(jobEntry.getKey());
119 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
120 LOG.trace("About to clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
121 synchronized (jobEntriesMap) {
122 JobQueue jobQueue = jobEntriesMap.get(jobEntry.getKey());
123 jobQueue.setExecutingEntry(null);
124 if (jobQueue.getWaitingEntries().isEmpty()) {
125 LOG.trace("Clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
126 jobEntriesMap.remove(jobEntry.getKey());
132 * Generate the hashkey for the jobQueueMap.
134 private Integer getHashKey(String key) {
135 int code = key.hashCode();
136 return (code % THREADPOOL_SIZE + THREADPOOL_SIZE) % THREADPOOL_SIZE;
140 * JobCallback class is used as a future callback for
141 * main and rollback workers to handle success and failure.
143 private class JobCallback implements FutureCallback<List<Void>> {
144 private final JobEntry jobEntry;
146 JobCallback(JobEntry jobEntry) {
147 this.jobEntry = jobEntry;
151 * This implies that all the future instances have returned success. -- TODO: Confirm this
154 public void onSuccess(List<Void> voids) {
155 LOG.trace("Job {} completed successfully", jobEntry.getKey());
160 * Handle failure callbacks.
161 * If more retry needed, the retrycount is decremented and mainworker is executed again.
162 * After retries completed, rollbackworker is executed.
163 * If rollbackworker fails, this is a double-fault. Double fault is logged and ignored.
166 public void onFailure(Throwable throwable) {
167 LOG.warn("Job: {} failed with exception: {} {}", jobEntry, throwable.getClass().getSimpleName(),
168 throwable.getStackTrace());
169 if (jobEntry.getMainWorker() == null) {
170 LOG.error("Job: {} failed with Double-Fault. Bailing Out.", jobEntry);
175 int retryCount = jobEntry.decrementRetryCountAndGet();
176 if ( retryCount > 0) {
177 long waitTime = RETRY_WAIT_BASE_TIME * 10 / retryCount;
178 scheduledExecutorService.schedule(
180 MainTask worker = new MainTask(jobEntry);
181 fjPool.execute(worker);
184 TimeUnit.MILLISECONDS);
188 if (jobEntry.getRollbackWorker() != null) {
189 jobEntry.setMainWorker(null);
190 RollbackTask rollbackTask = new RollbackTask(jobEntry);
191 fjPool.execute(rollbackTask);
200 * Execute the RollbackCallable provided by the application in the eventuality of a failure.
202 private class RollbackTask implements Runnable {
203 private final JobEntry jobEntry;
205 RollbackTask(JobEntry jobEntry) {
206 this.jobEntry = jobEntry;
210 @SuppressWarnings("checkstyle:illegalcatch")
212 RollbackCallable callable = jobEntry.getRollbackWorker();
213 callable.setFutures(jobEntry.getFutures());
214 List<ListenableFuture<Void>> futures = null;
217 futures = callable.call();
218 } catch (Exception e) {
219 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
222 if (futures == null || futures.isEmpty()) {
227 ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
228 Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
229 jobEntry.setFutures(futures);
234 * Execute the MainWorker callable.
236 private class MainTask implements Runnable {
237 private final JobEntry jobEntry;
239 MainTask(JobEntry jobEntry) {
240 this.jobEntry = jobEntry;
244 @SuppressWarnings("checkstyle:illegalcatch")
246 List<ListenableFuture<Void>> futures = null;
247 long jobStartTimestamp = System.currentTimeMillis();
248 LOG.trace("Running job {}", jobEntry.getKey());
251 futures = jobEntry.getMainWorker().call();
252 long jobExecutionTime = System.currentTimeMillis() - jobStartTimestamp;
253 LOG.trace("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
254 } catch (Exception e) {
255 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
258 if (futures == null || futures.isEmpty()) {
263 ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
264 Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
265 jobEntry.setFutures(futures);
269 private class JobQueueHandler implements Runnable {
271 @SuppressWarnings("checkstyle:illegalcatch")
273 LOG.info("Starting JobQueue Handler Thread with pool size {}", THREADPOOL_SIZE);
276 for (int i = 0; i < THREADPOOL_SIZE; i++) {
277 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
278 if (jobEntriesMap.isEmpty()) {
282 LOG.trace("JobQueueHandler handling queue {} with size {}", i, jobEntriesMap.size());
283 synchronized (jobEntriesMap) {
284 Iterator<Map.Entry<String, JobQueue>> it = jobEntriesMap.entrySet().iterator();
285 while (it.hasNext()) {
286 Map.Entry<String, JobQueue> entry = it.next();
287 if (entry.getValue().getExecutingEntry() != null) {
290 JobEntry jobEntry = entry.getValue().getWaitingEntries().poll();
291 if (jobEntry != null) {
292 entry.getValue().setExecutingEntry(jobEntry);
293 MainTask worker = new MainTask(jobEntry);
294 LOG.trace("Executing job {} from queue {}", jobEntry.getKey(), i);
295 fjPool.execute(worker);
303 reentrantLock.lock();
305 if (isJobQueueEmpty()) {
306 waitCondition.await();
309 reentrantLock.unlock();
311 } catch (Exception e) {
312 LOG.error("Exception while executing the tasks {} ", e);
313 } catch (Throwable e) {
314 LOG.error("Error while executing the tasks {} ", e);
320 private boolean isJobQueueEmpty() {
321 for (int i = 0; i < THREADPOOL_SIZE; i++) {
322 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
323 if (!jobEntriesMap.isEmpty()) {