Fix raw types in NodeConfiguratorImpl
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / nodeconfigurator / NodeConfiguratorImpl.java
1 /*
2  * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.frm.nodeconfigurator;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.ListeningExecutorService;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.ThreadFactoryBuilder;
16 import com.google.errorprone.annotations.Var;
17 import java.util.Map;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.RejectedExecutionException;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import org.opendaylight.infrautils.utils.concurrent.LoggingUncaughtThreadDeathContextRunnable;
25 import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
26 import org.opendaylight.yangtools.util.concurrent.ThreadFactoryProvider;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 public class NodeConfiguratorImpl implements NodeConfigurator {
31
32     private static final Logger LOG = LoggerFactory.getLogger(NodeConfiguratorImpl.class);
33     private static final String NODE_EXECUTOR_PREFIX = "nc-exe-";
34
35     private final Map<String, JobQueue> jobQueueMap = new ConcurrentHashMap<>();
36     private final AtomicBoolean jobQueueHandlerThreadStarted = new AtomicBoolean(false);
37     private final Thread jobQueueHandlerThread;
38     private volatile boolean shutdown = false;
39     private final ListeningExecutorService syncThreadPool;
40
41     public NodeConfiguratorImpl() {
42         jobQueueHandlerThread = ThreadFactoryProvider.builder()
43                 .namePrefix("nc-jobqueue")
44                 .logger(LOG)
45                 .build().get()
46                 .newThread(new JobQueueHandler());
47         final ExecutorService executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
48                 .setNameFormat(NODE_EXECUTOR_PREFIX + "%d")
49                 .setDaemon(true)
50                 .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
51                 .build());
52         syncThreadPool = MoreExecutors.listeningDecorator(executorService);
53     }
54
55     private void signalForNextJob() {
56         if (jobQueueHandlerThreadStarted.compareAndSet(false, true)) {
57             jobQueueHandlerThread.start();
58         }
59     }
60
61     @Override
62     public <T> ListenableFuture<T> enqueueJob(final String key, final Callable<ListenableFuture<T>> mainWorker) {
63
64         JobEntry<T> jobEntry = new JobEntry<>(key, mainWorker);
65         JobQueue jobQueue = jobQueueMap.computeIfAbsent(key, mapKey -> new JobQueue());
66         jobQueue.addEntry(jobEntry);
67         signalForNextJob();
68
69         return jobEntry.getResultFuture();
70     }
71
72     @Override
73     public void close() throws Exception {
74         {
75             LOG.info("NodeConfigurator shutting down... (tasks still running may be stopped/cancelled/interrupted)");
76             syncThreadPool.shutdownNow();
77             try {
78                 jobQueueHandlerThread.join(10000);
79             } catch (InterruptedException e) {
80                 // Shouldn't get interrupted - either way we don't care.
81             }
82
83             LOG.info("NodeConfigurator now closed for business.");
84         }
85     }
86
87     private class JobQueueHandler implements Runnable {
88         @Override
89         @SuppressWarnings("checkstyle:illegalcatch")
90         public void run() {
91             LOG.info("Starting JobQueue Handler Thread");
92             while (true) {
93                 try {
94                     for (Map.Entry<String, JobQueue> entry : jobQueueMap.entrySet()) {
95                         if (shutdown) {
96                             break;
97                         }
98                         JobQueue jobQueue = entry.getValue();
99                         if (jobQueue.getExecutingEntry() != null) {
100                             continue;
101                         }
102                         JobEntry<?> jobEntry = jobQueue.poll();
103                         if (jobEntry == null) {
104                             // job queue is empty. so continue with next job queue entry
105                             continue;
106                         }
107                         jobQueue.setExecutingEntry(jobEntry);
108                         MainTask<?> worker = new MainTask<>(jobEntry);
109                         LOG.trace("Executing job with key: {}", jobEntry.getKey());
110                         executeTask(worker) ;
111                     }
112                 } catch (Exception e) {
113                     LOG.error("Exception while executing the tasks", e);
114                 }
115             }
116         }
117
118     }
119
120     private class MainTask<T> extends LoggingUncaughtThreadDeathContextRunnable {
121         private final JobEntry<T> jobEntry;
122
123         MainTask(final JobEntry<T> jobEntry) {
124             super(LOG, jobEntry::toString);
125             this.jobEntry = jobEntry;
126         }
127
128         @Override
129         @SuppressWarnings("checkstyle:illegalcatch")
130         public void runWithUncheckedExceptionLogging() {
131             @Var ListenableFuture<T> future = null;
132             LOG.trace("Running job with key: {}", jobEntry.getKey());
133
134             try {
135                 Callable<ListenableFuture<T>> mainWorker = jobEntry.getMainWorker();
136                 if (mainWorker != null) {
137                     future = mainWorker.call();
138                 } else {
139                     LOG.error("Unexpected no (null) main worker on job: {}", jobEntry);
140                 }
141
142             } catch (Exception e) {
143                 LOG.error("Direct Exception (not failed Future) when executing job, won't even retry: {}", jobEntry, e);
144             }
145
146             if (future == null) {
147                 jobEntry.setResultFuture(null);
148                 clearJob(jobEntry);
149                 return;
150             }
151             clearJob(jobEntry);
152             Futures.addCallback(future, new FutureCallback<T>() {
153                 @Override
154                 public void onSuccess(final T result) {
155                     LOG.trace("Job completed successfully: {}", jobEntry.getKey());
156                     jobEntry.setResultFuture(result);
157                     clearJob(jobEntry);
158                 }
159
160                 @Override
161                 public void onFailure(final Throwable cause) {
162                     clearJob(jobEntry);
163                 }
164             }, MoreExecutors.directExecutor());
165         }
166     }
167
168     private void clearJob(final JobEntry<?> jobEntry) {
169         String jobKey = jobEntry.getKey();
170         LOG.trace("About to clear jobKey: {}", jobKey);
171         JobQueue jobQueue = jobQueueMap.get(jobKey);
172         if (jobQueue != null) {
173             jobQueue.setExecutingEntry(null);
174         } else {
175             LOG.error("clearJob: jobQueueMap did not contain the key for this entry: {}", jobEntry);
176         }
177     }
178
179     private void executeTask(final Runnable task) {
180         try {
181             syncThreadPool.submit(task);
182         } catch (RejectedExecutionException e) {
183             if (!syncThreadPool.isShutdown()) {
184                 LOG.error("syncThreadPool task rejected", e);
185             }
186         }
187     }
188 }