Merge "Add SFC relevant service binding constants"
[genius.git] / mdsalutil / mdsalutil-api / src / main / java / org / opendaylight / genius / datastoreutils / DataStoreJobCoordinator.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.genius.datastoreutils;
10
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16
17 import java.util.Iterator;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ForkJoinPool;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.locks.Condition;
25 import java.util.concurrent.locks.ReentrantLock;
26
27 public class DataStoreJobCoordinator {
28     private static final Logger LOG = LoggerFactory.getLogger(DataStoreJobCoordinator.class);
29
30     private static final int THREADPOOL_SIZE = Runtime.getRuntime().availableProcessors();
31
32     private ForkJoinPool fjPool;
33     private Map<Integer,Map<String, JobQueue>> jobQueueMap = new ConcurrentHashMap<>();
34     private ReentrantLock reentrantLock = new ReentrantLock();
35     private Condition waitCondition = reentrantLock.newCondition();
36
37     private static DataStoreJobCoordinator instance;
38
39     static {
40         instance = new DataStoreJobCoordinator();
41     }
42
43     public static DataStoreJobCoordinator getInstance() {
44         return instance;
45     }
46
47     /**
48      *
49      */
50     private DataStoreJobCoordinator() {
51         fjPool = new ForkJoinPool();
52
53         for (int i = 0; i < THREADPOOL_SIZE; i++) {
54             Map<String, JobQueue> jobEntriesMap = new ConcurrentHashMap<String, JobQueue>();
55             jobQueueMap.put(i, jobEntriesMap);
56         }
57
58         new Thread(new JobQueueHandler()).start();
59     }
60
61     public void enqueueJob(String key,
62             Callable<List<ListenableFuture<Void>>> mainWorker) {
63         enqueueJob(key, mainWorker, null, 0);
64     }
65
66     public void enqueueJob(String key,
67             Callable<List<ListenableFuture<Void>>> mainWorker,
68             RollbackCallable rollbackWorker) {
69         enqueueJob(key, mainWorker, rollbackWorker, 0);
70     }
71
72     public void enqueueJob(String key,
73             Callable<List<ListenableFuture<Void>>> mainWorker,
74             int maxRetries) {
75         enqueueJob(key, mainWorker, null, maxRetries);
76     }
77
78     /**
79      *
80      * @param key
81      * @param mainWorker
82      * @param rollbackWorker
83      * @param maxRetries
84      *
85      * This is used by the external applications to enqueue a Job with an appropriate key.
86      * A JobEntry is created and queued appropriately.
87      */
88
89     public void enqueueJob(String key,
90             Callable<List<ListenableFuture<Void>>> mainWorker,
91             RollbackCallable rollbackWorker,
92             int maxRetries) {
93         JobEntry jobEntry = new JobEntry(key, mainWorker, rollbackWorker, maxRetries);
94         Integer hashKey = getHashKey(key);
95         LOG.debug("Obtained Hashkey: {}, for jobkey: {}", hashKey, key);
96
97         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
98         synchronized (jobEntriesMap) {
99             JobQueue jobQueue = jobEntriesMap.get(key);
100             if (jobQueue == null) {
101                 jobQueue = new JobQueue();
102             }
103             LOG.trace("Adding jobkey {} to queue {} with size {}", key, hashKey, jobEntriesMap.size());
104             jobQueue.addEntry(jobEntry);
105             jobEntriesMap.put(key, jobQueue);
106         }
107         reentrantLock.lock();
108         try {
109             waitCondition.signal();
110         } finally {
111             reentrantLock.unlock();
112         }
113     }
114
115     /**
116      * clearJob is used to cleanup the submitted job from the jobqueue.
117      **/
118     private void clearJob(JobEntry jobEntry) {
119         Integer hashKey = getHashKey(jobEntry.getKey());
120         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
121         LOG.trace("About to clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
122         synchronized (jobEntriesMap) {
123             JobQueue jobQueue = jobEntriesMap.get(jobEntry.getKey());
124             jobQueue.setExecutingEntry(null);
125             if (jobQueue.getWaitingEntries().isEmpty()) {
126                 LOG.trace("Clear jobkey {} from queue {}", jobEntry.getKey(), hashKey);
127                 jobEntriesMap.remove(jobEntry.getKey());
128             }
129         }
130     }
131
132     /**
133      *
134      * @param key
135      * @return generated hashkey
136      *
137      * Used to generate the hashkey in to the jobQueueMap.
138      */
139     private Integer getHashKey(String key) {
140         int code = key.hashCode();
141         return (code % THREADPOOL_SIZE + THREADPOOL_SIZE) % THREADPOOL_SIZE;
142     }
143
144     /**
145      * JobCallback class is used as a future callback for
146      * main and rollback workers to handle success and failure.
147      */
148     private class JobCallback implements FutureCallback<List<Void>> {
149         private JobEntry jobEntry;
150
151         public JobCallback(JobEntry jobEntry) {
152             this.jobEntry = jobEntry;
153         }
154
155         /**
156          * @param voids
157          * This implies that all the future instances have returned success. -- TODO: Confirm this
158          */
159         @Override
160         public void onSuccess(List<Void> voids) {
161             LOG.trace("Job {} completed successfully", jobEntry.getKey());
162             clearJob(jobEntry);
163         }
164
165         /**
166          *
167          * @param throwable
168          * This method is used to handle failure callbacks.
169          * If more retry needed, the retrycount is decremented and mainworker is executed again.
170          * After retries completed, rollbackworker is executed.
171          * If rollbackworker fails, this is a double-fault. Double fault is logged and ignored.
172          */
173
174         @Override
175         public void onFailure(Throwable throwable) {
176             LOG.warn("Job: {} failed with exception: {} {}", jobEntry, throwable.getClass().getSimpleName(),
177                     throwable.getStackTrace());
178             if (jobEntry.getMainWorker() == null) {
179                 LOG.error("Job: {} failed with Double-Fault. Bailing Out.", jobEntry);
180                 clearJob(jobEntry);
181                 return;
182             }
183
184             if (jobEntry.decrementRetryCountAndGet() > 0) {
185                 MainTask worker = new MainTask(jobEntry);
186                 fjPool.execute(worker);
187                 return;
188             }
189
190             if (jobEntry.getRollbackWorker() != null) {
191                 jobEntry.setMainWorker(null);
192                 RollbackTask rollbackTask = new RollbackTask(jobEntry);
193                 fjPool.execute(rollbackTask);
194                 return;
195             }
196
197             clearJob(jobEntry);
198         }
199     }
200
201     /**
202      * RollbackTask is used to execute the RollbackCallable provided by the application
203      * in the eventuality of a failure.
204      */
205
206     private class RollbackTask implements Runnable {
207         private JobEntry jobEntry;
208
209         public RollbackTask(JobEntry jobEntry) {
210             this.jobEntry = jobEntry;
211         }
212
213         @Override
214         public void run() {
215             RollbackCallable callable = jobEntry.getRollbackWorker();
216             callable.setFutures(jobEntry.getFutures());
217             List<ListenableFuture<Void>> futures = null;
218
219             try {
220                 futures = callable.call();
221             } catch (Exception e){
222                 LOG.error("Exception when executing jobEntry: {}, exception: {}", jobEntry, e.getStackTrace());
223                 e.printStackTrace();
224             }
225
226             if (futures == null || futures.isEmpty()) {
227                 clearJob(jobEntry);
228                 return;
229             }
230
231             ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
232             Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
233             jobEntry.setFutures(futures);
234         }
235     }
236
237     /**
238      * MainTask is used to execute the MainWorker callable.
239      */
240
241     private class MainTask implements Runnable {
242         private JobEntry jobEntry;
243
244         public MainTask(JobEntry jobEntry) {
245             this.jobEntry = jobEntry;
246         }
247
248         @Override
249         public void run() {
250             List<ListenableFuture<Void>> futures = null;
251             long jobStartTimestamp = System.currentTimeMillis();
252             LOG.trace("Running job {}", jobEntry.getKey());
253
254             try {
255                 futures = jobEntry.getMainWorker().call();
256                 long jobExecutionTime = System.currentTimeMillis() - jobStartTimestamp;
257                 LOG.trace("Job {} took {}ms to complete", jobEntry.getKey(), jobExecutionTime);
258             } catch (Exception e){
259                 LOG.error("Exception when executing jobEntry: {}", jobEntry, e);
260             }
261
262             if (futures == null || futures.isEmpty()) {
263                 clearJob(jobEntry);
264                 return;
265             }
266
267             ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
268             Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
269             jobEntry.setFutures(futures);
270         }
271     }
272
273     private class JobQueueHandler implements Runnable {
274         @Override
275         public void run() {
276             LOG.info("Starting JobQueue Handler Thread with pool size {}", THREADPOOL_SIZE);
277             while (true) {
278                 try {
279                     for (int i = 0; i < THREADPOOL_SIZE; i++) {
280                         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
281                         if (jobEntriesMap.isEmpty()) {
282                             continue;
283                         }
284
285                         LOG.trace("JobQueueHandler handling queue {} with size {}", i, jobEntriesMap.size());
286                         synchronized (jobEntriesMap) {
287                             Iterator<Map.Entry<String, JobQueue>> it = jobEntriesMap.entrySet().iterator();
288                             while (it.hasNext()) {
289                                 Map.Entry<String, JobQueue> entry = it.next();
290                                 if (entry.getValue().getExecutingEntry() != null) {
291                                     continue;
292                                 }
293                                 JobEntry jobEntry = entry.getValue().getWaitingEntries().poll();
294                                 if (jobEntry != null) {
295                                     entry.getValue().setExecutingEntry(jobEntry);
296                                     MainTask worker = new MainTask(jobEntry);
297                                     LOG.trace("Executing job {} from queue {}", jobEntry.getKey(), i);
298                                     fjPool.execute(worker);
299                                 } else {
300                                     it.remove();
301                                 }
302                             }
303                         }
304                     }
305
306                     reentrantLock.lock();
307                     try {
308                         if (isJobQueueEmpty()) {
309                             waitCondition.await();
310                         }
311                     } finally {
312                         reentrantLock.unlock();
313                     }
314                 } catch (Exception e) {
315                     LOG.error("Exception while executing the tasks {} ", e);
316                 } catch (Throwable e) {
317                     LOG.error("Error while executing the tasks {} ", e);
318                 }
319             }
320         }
321     }
322
323     private boolean isJobQueueEmpty() {
324         for (int i = 0; i < THREADPOOL_SIZE; i++) {
325             Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
326             if (!jobEntriesMap.isEmpty()) {
327                 return false;
328             }
329         }
330
331         return true;
332     }
333 }