2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.genius.datastoreutils;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.Iterator;
15 import java.util.List;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.ForkJoinPool;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.locks.Condition;
24 import java.util.concurrent.locks.ReentrantLock;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
28 public class DataStoreJobCoordinator {
30 private static final Logger LOG = LoggerFactory.getLogger(DataStoreJobCoordinator.class);
32 private static final int THREADPOOL_SIZE = Runtime.getRuntime().availableProcessors();
33 private static final long RETRY_WAIT_BASE_TIME = 100;
35 private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
36 private final ForkJoinPool fjPool;
37 private final Map<Integer, Map<String, JobQueue>> jobQueueMap = new ConcurrentHashMap<>();
38 private final ReentrantLock reentrantLock = new ReentrantLock();
39 private final Condition waitCondition = reentrantLock.newCondition();
41 private static DataStoreJobCoordinator instance;
44 instance = new DataStoreJobCoordinator();
47 public static DataStoreJobCoordinator getInstance() {
51 private DataStoreJobCoordinator() {
52 fjPool = new ForkJoinPool();
54 for (int i = 0; i < THREADPOOL_SIZE; i++) {
55 Map<String, JobQueue> jobEntriesMap = new ConcurrentHashMap<>();
56 jobQueueMap.put(i, jobEntriesMap);
59 new Thread(new JobQueueHandler()).start();
62 public void enqueueJob(String key,
63 Callable<List<ListenableFuture<Void>>> mainWorker) {
64 enqueueJob(key, mainWorker, null, 0);
67 public void enqueueJob(String key,
68 Callable<List<ListenableFuture<Void>>> mainWorker,
69 RollbackCallable rollbackWorker) {
70 enqueueJob(key, mainWorker, rollbackWorker, 0);
73 public void enqueueJob(String key,
74 Callable<List<ListenableFuture<Void>>> mainWorker,
76 enqueueJob(key, mainWorker, null, maxRetries);
79 public void enqueueJob(AbstractDataStoreJob job) throws InvalidJobException {
81 enqueueJob(job.getJobQueueKey(), job);
85 * Enqueue a Job with an appropriate key.
86 * A JobEntry is created and queued appropriately.
88 public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
89 RollbackCallable rollbackWorker, int maxRetries) {
90 JobEntry jobEntry = new JobEntry(key, mainWorker, rollbackWorker, maxRetries);
91 Integer hashKey = getHashKey(key);
92 LOG.debug("Obtained Hashkey: {}, for jobkey: {}", hashKey, key);
94 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
95 synchronized (jobEntriesMap) {
96 JobQueue jobQueue = jobEntriesMap.get(key);
97 if (jobQueue == null) {
98 jobQueue = new JobQueue();
100 LOG.trace("Adding jobkey {} to queue {} with size {}", key, hashKey, jobEntriesMap.size());
101 jobQueue.addEntry(jobEntry);
102 jobEntriesMap.put(key, jobQueue);
104 reentrantLock.lock();
106 waitCondition.signal();
108 reentrantLock.unlock();
113 * Cleanup the submitted job from the job queue.
115 private void clearJob(JobEntry jobEntry) {
116 Integer hashKey = getHashKey(jobEntry.getKey());
117 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
118 LOG.trace("About to clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
119 synchronized (jobEntriesMap) {
120 JobQueue jobQueue = jobEntriesMap.get(jobEntry.getKey());
121 jobQueue.setExecutingEntry(null);
122 if (jobQueue.getWaitingEntries().isEmpty()) {
123 LOG.trace("Clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
124 jobEntriesMap.remove(jobEntry.getKey());
130 * Generate the hashkey for the jobQueueMap.
132 private Integer getHashKey(String key) {
133 int code = key.hashCode();
134 return (code % THREADPOOL_SIZE + THREADPOOL_SIZE) % THREADPOOL_SIZE;
138 * JobCallback class is used as a future callback for
139 * main and rollback workers to handle success and failure.
141 private class JobCallback implements FutureCallback<List<Void>> {
142 private final JobEntry jobEntry;
144 JobCallback(JobEntry jobEntry) {
145 this.jobEntry = jobEntry;
149 * This implies that all the future instances have returned success. -- TODO: Confirm this
152 public void onSuccess(List<Void> voids) {
153 LOG.trace("Job {} completed successfully", jobEntry.getKey());
158 * Handle failure callbacks.
159 * If more retry needed, the retrycount is decremented and mainworker is executed again.
160 * After retries completed, rollbackworker is executed.
161 * If rollbackworker fails, this is a double-fault. Double fault is logged and ignored.
164 public void onFailure(Throwable throwable) {
165 LOG.warn("Job: {} failed with exception: {} {}", jobEntry, throwable.getClass().getSimpleName(),
166 throwable.getStackTrace());
167 if (jobEntry.getMainWorker() == null) {
168 LOG.error("Job: {} failed with Double-Fault. Bailing Out.", jobEntry);
173 int retryCount = jobEntry.decrementRetryCountAndGet();
174 if ( retryCount > 0) {
175 long waitTime = RETRY_WAIT_BASE_TIME * 10 / retryCount;
176 scheduledExecutorService.schedule(
178 MainTask worker = new MainTask(jobEntry);
179 fjPool.execute(worker);
182 TimeUnit.MILLISECONDS);
186 if (jobEntry.getRollbackWorker() != null) {
187 jobEntry.setMainWorker(null);
188 RollbackTask rollbackTask = new RollbackTask(jobEntry);
189 fjPool.execute(rollbackTask);
198 * Execute the RollbackCallable provided by the application in the eventuality of a failure.
200 private class RollbackTask implements Runnable {
201 private final JobEntry jobEntry;
203 RollbackTask(JobEntry jobEntry) {
204 this.jobEntry = jobEntry;
208 @SuppressWarnings("checkstyle:illegalcatch")
210 RollbackCallable callable = jobEntry.getRollbackWorker();
211 callable.setFutures(jobEntry.getFutures());
212 List<ListenableFuture<Void>> futures = null;
215 futures = callable.call();
216 } catch (Exception e) {
217 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
220 if (futures == null || futures.isEmpty()) {
225 ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
226 Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
227 jobEntry.setFutures(futures);
232 * Execute the MainWorker callable.
234 private class MainTask implements Runnable {
235 private final JobEntry jobEntry;
237 MainTask(JobEntry jobEntry) {
238 this.jobEntry = jobEntry;
242 @SuppressWarnings("checkstyle:illegalcatch")
244 List<ListenableFuture<Void>> futures = null;
245 long jobStartTimestamp = System.currentTimeMillis();
246 LOG.trace("Running job {}", jobEntry.getKey());
249 futures = jobEntry.getMainWorker().call();
250 long jobExecutionTime = System.currentTimeMillis() - jobStartTimestamp;
251 LOG.trace("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
252 } catch (Exception e) {
253 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
256 if (futures == null || futures.isEmpty()) {
261 ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
262 Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
263 jobEntry.setFutures(futures);
267 private class JobQueueHandler implements Runnable {
269 @SuppressWarnings("checkstyle:illegalcatch")
271 LOG.info("Starting JobQueue Handler Thread with pool size {}", THREADPOOL_SIZE);
274 for (int i = 0; i < THREADPOOL_SIZE; i++) {
275 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
276 if (jobEntriesMap.isEmpty()) {
280 LOG.trace("JobQueueHandler handling queue {} with size {}", i, jobEntriesMap.size());
281 synchronized (jobEntriesMap) {
282 Iterator<Map.Entry<String, JobQueue>> it = jobEntriesMap.entrySet().iterator();
283 while (it.hasNext()) {
284 Map.Entry<String, JobQueue> entry = it.next();
285 if (entry.getValue().getExecutingEntry() != null) {
288 JobEntry jobEntry = entry.getValue().getWaitingEntries().poll();
289 if (jobEntry != null) {
290 entry.getValue().setExecutingEntry(jobEntry);
291 MainTask worker = new MainTask(jobEntry);
292 LOG.trace("Executing job {} from queue {}", jobEntry.getKey(), i);
293 fjPool.execute(worker);
301 reentrantLock.lock();
303 if (isJobQueueEmpty()) {
304 waitCondition.await();
307 reentrantLock.unlock();
309 } catch (Exception e) {
310 LOG.error("Exception while executing the tasks {} ", e);
311 } catch (Throwable e) {
312 LOG.error("Error while executing the tasks {} ", e);
318 private boolean isJobQueueEmpty() {
319 for (int i = 0; i < THREADPOOL_SIZE; i++) {
320 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
321 if (!jobEntriesMap.isEmpty()) {