2 * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.vpnservice.datastoreutils;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
17 import java.util.Iterator;
18 import java.util.List;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ForkJoinPool;
23 import java.util.concurrent.TimeUnit;
25 public class DataStoreJobCoordinator {
26 private static final Logger LOG = LoggerFactory.getLogger(DataStoreJobCoordinator.class);
28 private static final int THREADPOOL_SIZE = Runtime.getRuntime().availableProcessors();
30 private ForkJoinPool fjPool;
31 private Map<Integer,Map<String, JobQueue>> jobQueueMap = new ConcurrentHashMap<>();
33 private static DataStoreJobCoordinator instance;
36 instance = new DataStoreJobCoordinator();
39 public static DataStoreJobCoordinator getInstance() {
46 private DataStoreJobCoordinator() {
47 fjPool = new ForkJoinPool();
49 for (int i = 0; i < THREADPOOL_SIZE; i++) {
50 Map<String, JobQueue> jobEntriesMap = new ConcurrentHashMap<String, JobQueue>();
51 jobQueueMap.put(i, jobEntriesMap);
54 new Thread(new JobQueueHandler()).start();
57 public void enqueueJob(String key,
58 Callable<List<ListenableFuture<Void>>> mainWorker) {
59 enqueueJob(key, mainWorker, null, 0);
62 public void enqueueJob(String key,
63 Callable<List<ListenableFuture<Void>>> mainWorker,
64 RollbackCallable rollbackWorker) {
65 enqueueJob(key, mainWorker, rollbackWorker, 0);
68 public void enqueueJob(String key,
69 Callable<List<ListenableFuture<Void>>> mainWorker,
71 enqueueJob(key, mainWorker, null, maxRetries);
78 * @param rollbackWorker
81 * This is used by the external applications to enqueue a Job with an appropriate key.
82 * A JobEntry is created and queued appropriately.
85 public void enqueueJob(String key,
86 Callable<List<ListenableFuture<Void>>> mainWorker,
87 RollbackCallable rollbackWorker,
89 JobEntry jobEntry = new JobEntry(key, mainWorker, rollbackWorker, maxRetries);
90 Integer hashKey = getHashKey(key);
91 LOG.debug("Obtained Hashkey: {}, for jobkey: {}", hashKey, key);
93 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
94 synchronized (jobEntriesMap) {
95 JobQueue jobQueue = jobEntriesMap.get(key);
96 if (jobQueue == null) {
97 jobQueue = new JobQueue();
99 jobQueue.addEntry(jobEntry);
100 jobEntriesMap.put(key, jobQueue);
103 jobQueueMap.put(hashKey, jobEntriesMap); // Is this really needed ?
107 * clearJob is used to cleanup the submitted job from the jobqueue.
109 private void clearJob(JobEntry jobEntry) {
110 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(getHashKey(jobEntry.getKey()));
111 synchronized (jobEntriesMap) {
112 JobQueue jobQueue = jobEntriesMap.get(jobEntry.getKey());
113 jobQueue.setExecutingEntry(null);
114 if (jobQueue.getWaitingEntries().isEmpty()) {
115 jobEntriesMap.remove(jobEntry.getKey());
123 * @return generated hashkey
125 * Used to generate the hashkey in to the jobQueueMap.
127 private Integer getHashKey(String key) {
128 int code = key.hashCode();
129 return (code % THREADPOOL_SIZE + THREADPOOL_SIZE) % THREADPOOL_SIZE;
133 * JobCallback class is used as a future callback for
134 * main and rollback workers to handle success and failure.
136 private class JobCallback implements FutureCallback<List<Void>> {
137 private JobEntry jobEntry;
139 public JobCallback(JobEntry jobEntry) {
140 this.jobEntry = jobEntry;
145 * This implies that all the future instances have returned success. -- TODO: Confirm this
148 public void onSuccess(List<Void> voids) {
155 * This method is used to handle failure callbacks.
156 * If more retry needed, the retrycount is decremented and mainworker is executed again.
157 * After retries completed, rollbackworker is executed.
158 * If rollbackworker fails, this is a double-fault. Double fault is logged and ignored.
162 public void onFailure(Throwable throwable) {
163 LOG.warn("Job: {} failed with exception: {}", jobEntry, throwable.getStackTrace());
164 if (jobEntry.getMainWorker() == null) {
165 LOG.error("Job: {} failed with Double-Fault. Bailing Out.", jobEntry);
170 if (jobEntry.decrementRetryCountAndGet() > 0) {
171 MainTask worker = new MainTask(jobEntry);
172 fjPool.execute(worker);
176 if (jobEntry.getRollbackWorker() != null) {
177 jobEntry.setMainWorker(null);
178 RollbackTask rollbackTask = new RollbackTask(jobEntry);
179 fjPool.execute(rollbackTask);
188 * RollbackTask is used to execute the RollbackCallable provided by the application
189 * in the eventuality of a failure.
192 private class RollbackTask implements Runnable {
193 private JobEntry jobEntry;
195 public RollbackTask(JobEntry jobEntry) {
196 this.jobEntry = jobEntry;
201 RollbackCallable callable = jobEntry.getRollbackWorker();
202 callable.setFutures(jobEntry.getFutures());
203 List<ListenableFuture<Void>> futures = null;
206 futures = callable.call();
207 } catch (Exception e){
208 LOG.error("Exception when executing jobEntry: {}, exception: {}", jobEntry, e.getStackTrace());
212 if (futures == null || futures.isEmpty()) {
217 ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
218 Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
219 jobEntry.setFutures(futures);
224 * MainTask is used to execute the MainWorker callable.
227 private class MainTask implements Runnable {
228 private JobEntry jobEntry;
230 public MainTask(JobEntry jobEntry) {
231 this.jobEntry = jobEntry;
236 List<ListenableFuture<Void>> futures = null;
238 futures = jobEntry.getMainWorker().call();
239 } catch (Exception e){
240 LOG.error("Exception when executing jobEntry: {}, exception: {}", jobEntry, e.getStackTrace());
244 if (futures == null || futures.isEmpty()) {
249 ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
250 Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
251 jobEntry.setFutures(futures);
255 private class JobQueueHandler implements Runnable {
258 LOG.debug("Starting JobQueue Handler Thread.");
261 boolean jobAddedToPool = false;
262 for (int i = 0; i < THREADPOOL_SIZE; i++) {
263 Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
264 if (jobEntriesMap.isEmpty()) {
268 synchronized (jobEntriesMap) {
269 Iterator it = jobEntriesMap.entrySet().iterator();
270 while (it.hasNext()) {
271 Map.Entry<String, JobQueue> entry = (Map.Entry)it.next();
272 if (entry.getValue().getExecutingEntry() != null) {
275 JobEntry jobEntry = entry.getValue().getWaitingEntries().poll();
276 if (jobEntry != null) {
277 entry.getValue().setExecutingEntry(jobEntry);
278 MainTask worker = new MainTask(jobEntry);
279 fjPool.execute(worker);
280 jobAddedToPool = true;
288 if (!jobAddedToPool) {
289 TimeUnit.SECONDS.sleep(1);
291 } catch (Exception e) {
293 } catch (Throwable e) {