2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.genius.datastoreutils;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.Iterator;
15 import java.util.List;
17 import java.util.Arrays;
18 import java.util.Collections;
19 import java.util.concurrent.Callable;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ForkJoinPool;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.locks.Condition;
26 import java.util.concurrent.locks.ReentrantLock;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
30 public class DataStoreJobCoordinator {
32 private static final Logger LOG = LoggerFactory.getLogger(DataStoreJobCoordinator.class);
34 private static final int THREADPOOL_SIZE = Runtime.getRuntime().availableProcessors();
35 private static final long RETRY_WAIT_BASE_TIME = 100;
37 // package local instead of private for TestDataStoreJobCoordinator
38 final ForkJoinPool fjPool;
39 final Map<Integer, Map<String, JobQueue>> jobQueueMap = new ConcurrentHashMap<>();
41 private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
42 private final ReentrantLock reentrantLock = new ReentrantLock();
43 private final Condition waitCondition = reentrantLock.newCondition();
45 private static DataStoreJobCoordinator instance;
48 instance = new DataStoreJobCoordinator();
51 public static DataStoreJobCoordinator getInstance() {
55 private DataStoreJobCoordinator() {
56 fjPool = new ForkJoinPool();
58 for (int i = 0; i < THREADPOOL_SIZE; i++) {
59 Map<String, JobQueue> jobEntriesMap = new ConcurrentHashMap<>();
60 jobQueueMap.put(i, jobEntriesMap);
63 new Thread(new JobQueueHandler()).start();
66 public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker) {
67 enqueueJob(key, mainWorker, null, 0);
70 public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
71 RollbackCallable rollbackWorker) {
72 enqueueJob(key, mainWorker, rollbackWorker, 0);
75 public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker, int maxRetries) {
76 enqueueJob(key, mainWorker, null, maxRetries);
79 public void enqueueJob(AbstractDataStoreJob job) throws InvalidJobException {
81 enqueueJob(job.getJobQueueKey(), job);
85 * This is used by the external applications to enqueue a Job
86 * with an appropriate key. A JobEntry is created and queued
89 public void enqueueJob(String key, Callable<List<ListenableFuture<Void>>> mainWorker,
90 RollbackCallable rollbackWorker, int maxRetries) {
91 JobEntry jobEntry = new JobEntry(key, mainWorker, rollbackWorker, maxRetries);
92 Integer hashKey = getHashKey(key);
93 LOG.debug("Obtained Hashkey: {}, for jobkey: {}", hashKey, key);
95 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
96 synchronized (jobEntriesMap) {
97 JobQueue jobQueue = jobEntriesMap.get(key);
98 if (jobQueue == null) {
99 jobQueue = new JobQueue();
101 LOG.trace("Adding jobkey {} to queue {} with size {}", key, hashKey, jobEntriesMap.size());
102 jobQueue.addEntry(jobEntry);
103 jobEntriesMap.put(key, jobQueue);
105 DataStoreJobCoordinatorCounters.jobs_pending.inc();
107 reentrantLock.lock();
109 waitCondition.signal();
111 reentrantLock.unlock();
116 * Cleanup the submitted job from the job queue.
118 private void clearJob(JobEntry jobEntry) {
119 Integer hashKey = getHashKey(jobEntry.getKey());
120 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
121 LOG.trace("About to clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
122 synchronized (jobEntriesMap) {
123 JobQueue jobQueue = jobEntriesMap.get(jobEntry.getKey());
124 jobQueue.setExecutingEntry(null);
125 if (jobQueue.getWaitingEntries().isEmpty()) {
126 LOG.trace("Clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
127 jobEntriesMap.remove(jobEntry.getKey());
130 DataStoreJobCoordinatorCounters.jobs_cleared.inc();
134 * Used to generate the hashkey in to the jobQueueMap.
136 private Integer getHashKey(String key) {
137 int code = key.hashCode();
138 return (code % THREADPOOL_SIZE + THREADPOOL_SIZE) % THREADPOOL_SIZE;
142 * JobCallback class is used as a future callback for main and rollback
143 * workers to handle success and failure.
145 private class JobCallback implements FutureCallback<List<Void>> {
146 private final JobEntry jobEntry;
148 JobCallback(JobEntry jobEntry) {
149 this.jobEntry = jobEntry;
153 * This implies that all the future instances have returned
154 * success. -- TODO: Confirm this
157 public void onSuccess(List<Void> voids) {
158 LOG.trace("Job {} completed successfully", jobEntry.getKey());
163 * This method is used to handle failure callbacks. If more
164 * retry needed, the retrycount is decremented and mainworker
165 * is executed again. After retries completed, rollbackworker
166 * is executed. If rollbackworker fails, this is a
167 * double-fault. Double fault is logged and ignored.
170 public void onFailure(Throwable throwable) {
171 LOG.warn("Job: {} failed with exception: {} {}", jobEntry, throwable.getClass().getSimpleName(),
172 throwable.getStackTrace());
173 if (jobEntry.getMainWorker() == null) {
174 LOG.error("Job: {} failed with Double-Fault. Bailing Out.", jobEntry);
179 int retryCount = jobEntry.decrementRetryCountAndGet();
180 if (retryCount > 0) {
181 long waitTime = (RETRY_WAIT_BASE_TIME * 10) / retryCount;
182 scheduledExecutorService.schedule(() -> {
183 MainTask worker = new MainTask(jobEntry);
184 fjPool.execute(worker);
185 }, waitTime, TimeUnit.MILLISECONDS);
189 if (jobEntry.getRollbackWorker() != null) {
190 jobEntry.setMainWorker(null);
191 RollbackTask rollbackTask = new RollbackTask(jobEntry);
192 fjPool.execute(rollbackTask);
201 * RollbackTask is used to execute the RollbackCallable provided by the
202 * application in the eventuality of a failure.
204 private class RollbackTask implements Runnable {
205 private final JobEntry jobEntry;
207 RollbackTask(JobEntry jobEntry) {
208 this.jobEntry = jobEntry;
212 @SuppressWarnings("checkstyle:illegalcatch")
214 RollbackCallable callable = jobEntry.getRollbackWorker();
215 callable.setFutures(jobEntry.getFutures());
216 List<ListenableFuture<Void>> futures = null;
219 futures = callable.call();
220 } catch (Exception e) {
221 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
224 if (futures == null || futures.isEmpty()) {
229 ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
230 Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
231 jobEntry.setFutures(futures);
236 * Execute the MainWorker callable.
238 private class MainTask implements Runnable {
239 private static final int LONG_JOBS_THRESHOLD = 1000; // MS
240 private final JobEntry jobEntry;
242 MainTask(JobEntry jobEntry) {
243 this.jobEntry = jobEntry;
247 @SuppressWarnings("checkstyle:illegalcatch")
249 List<ListenableFuture<Void>> futures = null;
250 long jobStartTimestamp = System.currentTimeMillis();
251 LOG.trace("Running job {}", jobEntry.getKey());
254 futures = jobEntry.getMainWorker().call();
255 long jobExecutionTime = System.currentTimeMillis() - jobStartTimestamp;
256 printJobs(jobEntry.getKey(), jobExecutionTime);
257 } catch (Exception e) {
258 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
261 if (futures == null || futures.isEmpty()) {
266 ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
267 Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
268 jobEntry.setFutures(futures);
271 private void printJobs(String key, long jobExecutionTime) {
272 if (jobExecutionTime > LONG_JOBS_THRESHOLD) {
273 LOG.warn("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
276 LOG.trace("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
280 private class JobQueueHandler implements Runnable {
282 @SuppressWarnings("checkstyle:illegalcatch")
284 LOG.info("Starting JobQueue Handler Thread with pool size {}", THREADPOOL_SIZE);
287 for (int i = 0; i < THREADPOOL_SIZE; i++) {
288 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
289 if (jobEntriesMap.isEmpty()) {
292 LOG.trace("JobQueueHandler handling queue {} with kesy size {}. Keys: {} ", i,
293 jobEntriesMap.size(), Arrays.toString(jobEntriesMap.keySet().toArray()));
295 synchronized (jobEntriesMap) {
296 Iterator<Map.Entry<String, JobQueue>> it = jobEntriesMap.entrySet().iterator();
297 while (it.hasNext()) {
298 Map.Entry<String, JobQueue> entry = it.next();
299 if (entry.getValue().getExecutingEntry() != null) {
302 JobEntry jobEntry = entry.getValue().getWaitingEntries().poll();
303 if (jobEntry != null) {
304 entry.getValue().setExecutingEntry(jobEntry);
305 MainTask worker = new MainTask(jobEntry);
306 LOG.trace("Executing job {} from queue {}", jobEntry.getKey(), i);
307 fjPool.execute(worker);
308 DataStoreJobCoordinatorCounters.jobs_pending.dec();
312 DataStoreJobCoordinatorCounters.jobs_remove_entry.inc();
318 reentrantLock.lock();
320 if (isJobQueueEmpty()) {
321 waitCondition.await();
324 reentrantLock.unlock();
326 } catch (Exception e) {
327 LOG.error("Exception while executing the tasks {} ", e);
328 } catch (Throwable e) {
329 LOG.error("Error while executing the tasks {} ", e);
335 private boolean isJobQueueEmpty() {
336 for (int i = 0; i < THREADPOOL_SIZE; i++) {
337 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
338 if (!jobEntriesMap.isEmpty()) {