2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.genius.datastoreutils;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
17 import java.util.Iterator;
18 import java.util.List;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ForkJoinPool;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.locks.Condition;
25 import java.util.concurrent.locks.ReentrantLock;
27 public class DataStoreJobCoordinator {
28 private static final Logger LOG = LoggerFactory.getLogger(DataStoreJobCoordinator.class);
30 private static final int THREADPOOL_SIZE = Runtime.getRuntime().availableProcessors();
32 private ForkJoinPool fjPool;
33 private Map<Integer,Map<String, JobQueue>> jobQueueMap = new ConcurrentHashMap<>();
34 private ReentrantLock reentrantLock = new ReentrantLock();
35 private Condition waitCondition = reentrantLock.newCondition();
37 private static DataStoreJobCoordinator instance;
40 instance = new DataStoreJobCoordinator();
43 public static DataStoreJobCoordinator getInstance() {
50 private DataStoreJobCoordinator() {
51 fjPool = new ForkJoinPool();
53 for (int i = 0; i < THREADPOOL_SIZE; i++) {
54 Map<String, JobQueue> jobEntriesMap = new ConcurrentHashMap<String, JobQueue>();
55 jobQueueMap.put(i, jobEntriesMap);
58 new Thread(new JobQueueHandler()).start();
61 public void enqueueJob(String key,
62 Callable<List<ListenableFuture<Void>>> mainWorker) {
63 enqueueJob(key, mainWorker, null, 0);
66 public void enqueueJob(String key,
67 Callable<List<ListenableFuture<Void>>> mainWorker,
68 RollbackCallable rollbackWorker) {
69 enqueueJob(key, mainWorker, rollbackWorker, 0);
72 public void enqueueJob(String key,
73 Callable<List<ListenableFuture<Void>>> mainWorker,
75 enqueueJob(key, mainWorker, null, maxRetries);
82 * @param rollbackWorker
85 * This is used by the external applications to enqueue a Job with an appropriate key.
86 * A JobEntry is created and queued appropriately.
89 public void enqueueJob(String key,
90 Callable<List<ListenableFuture<Void>>> mainWorker,
91 RollbackCallable rollbackWorker,
93 JobEntry jobEntry = new JobEntry(key, mainWorker, rollbackWorker, maxRetries);
94 Integer hashKey = getHashKey(key);
95 LOG.debug("Obtained Hashkey: {}, for jobkey: {}", hashKey, key);
97 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
98 synchronized (jobEntriesMap) {
99 JobQueue jobQueue = jobEntriesMap.get(key);
100 if (jobQueue == null) {
101 jobQueue = new JobQueue();
103 LOG.trace("Adding jobkey {} to queue {} with size {}", key, hashKey, jobEntriesMap.size());
104 jobQueue.addEntry(jobEntry);
105 jobEntriesMap.put(key, jobQueue);
107 reentrantLock.lock();
109 waitCondition.signal();
111 reentrantLock.unlock();
116 * clearJob is used to cleanup the submitted job from the jobqueue.
118 private void clearJob(JobEntry jobEntry) {
119 Integer hashKey = getHashKey(jobEntry.getKey());
120 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
121 LOG.trace("About to clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
122 synchronized (jobEntriesMap) {
123 JobQueue jobQueue = jobEntriesMap.get(jobEntry.getKey());
124 jobQueue.setExecutingEntry(null);
125 if (jobQueue.getWaitingEntries().isEmpty()) {
126 LOG.trace("Clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
127 jobEntriesMap.remove(jobEntry.getKey());
135 * @return generated hashkey
137 * Used to generate the hashkey in to the jobQueueMap.
139 private Integer getHashKey(String key) {
140 int code = key.hashCode();
141 return (code % THREADPOOL_SIZE + THREADPOOL_SIZE) % THREADPOOL_SIZE;
145 * JobCallback class is used as a future callback for
146 * main and rollback workers to handle success and failure.
148 private class JobCallback implements FutureCallback<List<Void>> {
149 private JobEntry jobEntry;
151 public JobCallback(JobEntry jobEntry) {
152 this.jobEntry = jobEntry;
157 * This implies that all the future instances have returned success. -- TODO: Confirm this
160 public void onSuccess(List<Void> voids) {
161 LOG.trace("Job {} completed successfully", jobEntry.getKey());
168 * This method is used to handle failure callbacks.
169 * If more retry needed, the retrycount is decremented and mainworker is executed again.
170 * After retries completed, rollbackworker is executed.
171 * If rollbackworker fails, this is a double-fault. Double fault is logged and ignored.
175 public void onFailure(Throwable throwable) {
176 LOG.warn("Job: {} failed with exception: {} {}", jobEntry, throwable.getClass().getSimpleName(),
177 throwable.getStackTrace());
178 if (jobEntry.getMainWorker() == null) {
179 LOG.error("Job: {} failed with Double-Fault. Bailing Out.", jobEntry);
184 if (jobEntry.decrementRetryCountAndGet() > 0) {
185 MainTask worker = new MainTask(jobEntry);
186 fjPool.execute(worker);
190 if (jobEntry.getRollbackWorker() != null) {
191 jobEntry.setMainWorker(null);
192 RollbackTask rollbackTask = new RollbackTask(jobEntry);
193 fjPool.execute(rollbackTask);
202 * RollbackTask is used to execute the RollbackCallable provided by the application
203 * in the eventuality of a failure.
206 private class RollbackTask implements Runnable {
207 private JobEntry jobEntry;
209 public RollbackTask(JobEntry jobEntry) {
210 this.jobEntry = jobEntry;
215 RollbackCallable callable = jobEntry.getRollbackWorker();
216 callable.setFutures(jobEntry.getFutures());
217 List<ListenableFuture<Void>> futures = null;
220 futures = callable.call();
221 } catch (Exception e){
222 LOG.error("Exception when executing jobEntry: {}, exception: {}", jobEntry, e.getStackTrace());
226 if (futures == null || futures.isEmpty()) {
231 ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
232 Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
233 jobEntry.setFutures(futures);
238 * MainTask is used to execute the MainWorker callable.
241 private class MainTask implements Runnable {
242 private JobEntry jobEntry;
244 public MainTask(JobEntry jobEntry) {
245 this.jobEntry = jobEntry;
250 List<ListenableFuture<Void>> futures = null;
251 long jobStartTimestamp = System.currentTimeMillis();
252 LOG.trace("Running job {}", jobEntry.getKey());
255 futures = jobEntry.getMainWorker().call();
256 long jobExecutionTime = System.currentTimeMillis() - jobStartTimestamp;
257 LOG.trace("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
258 } catch (Exception e){
259 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
262 if (futures == null || futures.isEmpty()) {
267 ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
268 Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
269 jobEntry.setFutures(futures);
273 private class JobQueueHandler implements Runnable {
276 LOG.info("Starting JobQueue Handler Thread with pool size {}", THREADPOOL_SIZE);
279 for (int i = 0; i < THREADPOOL_SIZE; i++) {
280 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
281 if (jobEntriesMap.isEmpty()) {
285 LOG.trace("JobQueueHandler handling queue {} with size {}", i, jobEntriesMap.size());
286 synchronized (jobEntriesMap) {
287 Iterator<Map.Entry<String, JobQueue>> it = jobEntriesMap.entrySet().iterator();
288 while (it.hasNext()) {
289 Map.Entry<String, JobQueue> entry = it.next();
290 if (entry.getValue().getExecutingEntry() != null) {
293 JobEntry jobEntry = entry.getValue().getWaitingEntries().poll();
294 if (jobEntry != null) {
295 entry.getValue().setExecutingEntry(jobEntry);
296 MainTask worker = new MainTask(jobEntry);
297 LOG.trace("Executing job {} from queue {}", jobEntry.getKey(), i);
298 fjPool.execute(worker);
306 reentrantLock.lock();
308 if (isJobQueueEmpty()) {
309 waitCondition.await();
312 reentrantLock.unlock();
314 } catch (Exception e) {
315 LOG.error("Exception while executing the tasks {} ", e);
316 } catch (Throwable e) {
317 LOG.error("Error while executing the tasks {} ", e);
323 private boolean isJobQueueEmpty() {
324 for (int i = 0; i < THREADPOOL_SIZE; i++) {
325 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
326 if (!jobEntriesMap.isEmpty()) {