Working with OVS
[vpnservice.git] / mdsalutil / mdsalutil-api / src / main / java / org / opendaylight / vpnservice / datastoreutils / DataStoreJobCoordinator.java
1 /*
2  * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.vpnservice.datastoreutils;
10
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16
17 import java.util.Iterator;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ForkJoinPool;
23 import java.util.concurrent.TimeUnit;
24
25 public class DataStoreJobCoordinator {
26     private static final Logger LOG = LoggerFactory.getLogger(DataStoreJobCoordinator.class);
27
28     private static final int THREADPOOL_SIZE = Runtime.getRuntime().availableProcessors();
29
30     private ForkJoinPool fjPool;
31     private Map<Integer,Map<String, JobQueue>> jobQueueMap = new ConcurrentHashMap<>();
32
33     private static DataStoreJobCoordinator instance;
34
35     static {
36         instance = new DataStoreJobCoordinator();
37     }
38
39     public static DataStoreJobCoordinator getInstance() {
40         return instance;
41     }
42
43     /**
44      *
45      */
46     private DataStoreJobCoordinator() {
47         fjPool = new ForkJoinPool();
48
49         for (int i = 0; i < THREADPOOL_SIZE; i++) {
50             Map<String, JobQueue> jobEntriesMap = new ConcurrentHashMap<String, JobQueue>();
51             jobQueueMap.put(i, jobEntriesMap);
52         }
53
54         new Thread(new JobQueueHandler()).start();
55     }
56
57     public void enqueueJob(String key,
58                            Callable<List<ListenableFuture<Void>>> mainWorker) {
59         enqueueJob(key, mainWorker, null, 0);
60     }
61
62     public void enqueueJob(String key,
63                            Callable<List<ListenableFuture<Void>>> mainWorker,
64                            RollbackCallable rollbackWorker) {
65         enqueueJob(key, mainWorker, rollbackWorker, 0);
66     }
67
68     public void enqueueJob(String key,
69                            Callable<List<ListenableFuture<Void>>> mainWorker,
70                            int maxRetries) {
71         enqueueJob(key, mainWorker, null, maxRetries);
72     }
73
74     /**
75      *
76      * @param key
77      * @param mainWorker
78      * @param rollbackWorker
79      * @param maxRetries
80      *
81      * This is used by the external applications to enqueue a Job with an appropriate key.
82      * A JobEntry is created and queued appropriately.
83      */
84
85     public void enqueueJob(String key,
86                            Callable<List<ListenableFuture<Void>>> mainWorker,
87                            RollbackCallable rollbackWorker,
88                            int maxRetries) {
89         JobEntry jobEntry = new JobEntry(key, mainWorker, rollbackWorker, maxRetries);
90         Integer hashKey = getHashKey(key);
91         LOG.debug("Obtained Hashkey: {}, for jobkey: {}", hashKey, key);
92
93         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(hashKey);
94         synchronized (jobEntriesMap) {
95             JobQueue jobQueue = jobEntriesMap.get(key);
96             if (jobQueue == null) {
97                 jobQueue = new JobQueue();
98             }
99             jobQueue.addEntry(jobEntry);
100             jobEntriesMap.put(key, jobQueue);
101         }
102
103         jobQueueMap.put(hashKey, jobEntriesMap); // Is this really needed ?
104     }
105
106     /**
107      * clearJob is used to cleanup the submitted job from the jobqueue.
108      **/
109     private void clearJob(JobEntry jobEntry) {
110         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(getHashKey(jobEntry.getKey()));
111         synchronized (jobEntriesMap) {
112             JobQueue jobQueue = jobEntriesMap.get(jobEntry.getKey());
113             jobQueue.setExecutingEntry(null);
114             if (jobQueue.getWaitingEntries().isEmpty()) {
115                 jobEntriesMap.remove(jobEntry.getKey());
116             }
117         }
118     }
119
120     /**
121      *
122      * @param key
123      * @return generated hashkey
124      *
125      * Used to generate the hashkey in to the jobQueueMap.
126      */
127     private Integer getHashKey(String key) {
128         int code = key.hashCode();
129         return (code % THREADPOOL_SIZE + THREADPOOL_SIZE) % THREADPOOL_SIZE;
130     }
131
132     /**
133      * JobCallback class is used as a future callback for
134      * main and rollback workers to handle success and failure.
135      */
136     private class JobCallback implements FutureCallback<List<Void>> {
137         private JobEntry jobEntry;
138
139         public JobCallback(JobEntry jobEntry) {
140             this.jobEntry = jobEntry;
141         }
142
143         /**
144          * @param voids
145          * This implies that all the future instances have returned success. -- TODO: Confirm this
146          */
147         @Override
148         public void onSuccess(List<Void> voids) {
149             clearJob(jobEntry);
150         }
151
152         /**
153          *
154          * @param throwable
155          * This method is used to handle failure callbacks.
156          * If more retry needed, the retrycount is decremented and mainworker is executed again.
157          * After retries completed, rollbackworker is executed.
158          * If rollbackworker fails, this is a double-fault. Double fault is logged and ignored.
159          */
160
161         @Override
162         public void onFailure(Throwable throwable) {
163             LOG.warn("Job: {} failed with exception: {}", jobEntry, throwable.getStackTrace());
164             if (jobEntry.getMainWorker() == null) {
165                 LOG.error("Job: {} failed with Double-Fault. Bailing Out.", jobEntry);
166                 clearJob(jobEntry);
167                 return;
168             }
169
170             if (jobEntry.decrementRetryCountAndGet() > 0) {
171                 MainTask worker = new MainTask(jobEntry);
172                 fjPool.execute(worker);
173                 return;
174             }
175
176             if (jobEntry.getRollbackWorker() != null) {
177                 jobEntry.setMainWorker(null);
178                 RollbackTask rollbackTask = new RollbackTask(jobEntry);
179                 fjPool.execute(rollbackTask);
180                 return;
181             }
182
183             clearJob(jobEntry);
184         }
185     }
186
187     /**
188      * RollbackTask is used to execute the RollbackCallable provided by the application
189      * in the eventuality of a failure.
190      */
191
192     private class RollbackTask implements Runnable {
193         private JobEntry jobEntry;
194
195         public RollbackTask(JobEntry jobEntry) {
196             this.jobEntry = jobEntry;
197         }
198
199         @Override
200         public void run() {
201             RollbackCallable callable = jobEntry.getRollbackWorker();
202             callable.setFutures(jobEntry.getFutures());
203             List<ListenableFuture<Void>> futures = null;
204
205             try {
206                 futures = callable.call();
207             } catch (Exception e){
208                 LOG.error("Exception when executing jobEntry: {}, exception: {}", jobEntry, e.getStackTrace());
209                 e.printStackTrace();
210             }
211
212             if (futures == null || futures.isEmpty()) {
213                 clearJob(jobEntry);
214                 return;
215             }
216
217             ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
218             Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
219             jobEntry.setFutures(futures);
220         }
221     }
222
223     /**
224      * MainTask is used to execute the MainWorker callable.
225      */
226
227     private class MainTask implements Runnable {
228         private JobEntry jobEntry;
229
230         public MainTask(JobEntry jobEntry) {
231             this.jobEntry = jobEntry;
232         }
233
234         @Override
235         public void run() {
236             List<ListenableFuture<Void>> futures = null;
237             try {
238                 futures = jobEntry.getMainWorker().call();
239             } catch (Exception e){
240                 LOG.error("Exception when executing jobEntry: {}, exception: {}", jobEntry, e.getStackTrace());
241                 e.printStackTrace();
242             }
243
244             if (futures == null || futures.isEmpty()) {
245                 clearJob(jobEntry);
246                 return;
247             }
248
249             ListenableFuture<List<Void>> listenableFuture = Futures.allAsList(futures);
250             Futures.addCallback(listenableFuture, new JobCallback(jobEntry));
251             jobEntry.setFutures(futures);
252         }
253     }
254
255     private class JobQueueHandler implements Runnable {
256         @Override
257         public void run() {
258             LOG.debug("Starting JobQueue Handler Thread.");
259             while (true) {
260                 try {
261                     boolean jobAddedToPool = false;
262                     for (int i = 0; i < THREADPOOL_SIZE; i++) {
263                         Map<String, JobQueue> jobEntriesMap = jobQueueMap.get(i);
264                         if (jobEntriesMap.isEmpty()) {
265                             continue;
266                         }
267
268                         synchronized (jobEntriesMap) {
269                             Iterator it = jobEntriesMap.entrySet().iterator();
270                             while (it.hasNext()) {
271                                 Map.Entry<String, JobQueue> entry = (Map.Entry)it.next();
272                                 if (entry.getValue().getExecutingEntry() != null) {
273                                     continue;
274                                 }
275                                 JobEntry jobEntry = entry.getValue().getWaitingEntries().poll();
276                                 if (jobEntry != null) {
277                                     entry.getValue().setExecutingEntry(jobEntry);
278                                     MainTask worker = new MainTask(jobEntry);
279                                     fjPool.execute(worker);
280                                     jobAddedToPool = true;
281                                 } else {
282                                     it.remove();
283                                 }
284                             }
285                         }
286                     }
287
288                     if (!jobAddedToPool) {
289                         TimeUnit.SECONDS.sleep(1);
290                     }
291                 } catch (Exception e) {
292                     e.printStackTrace();
293                 } catch (Throwable e) {
294                     e.printStackTrace();
295                 }
296             }
297         }
298     }
299 }