2 * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.frm.nodeconfigurator;
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.ListeningExecutorService;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.ThreadFactoryBuilder;
16 import com.google.errorprone.annotations.Var;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.RejectedExecutionException;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import org.opendaylight.infrautils.utils.concurrent.LoggingUncaughtThreadDeathContextRunnable;
25 import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
26 import org.opendaylight.yangtools.util.concurrent.ThreadFactoryProvider;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
30 public class NodeConfiguratorImpl implements NodeConfigurator {
32 private static final Logger LOG = LoggerFactory.getLogger(NodeConfiguratorImpl.class);
33 private static final String NODE_EXECUTOR_PREFIX = "nc-exe-";
35 private final Map<String, JobQueue> jobQueueMap = new ConcurrentHashMap<>();
36 private final AtomicBoolean jobQueueHandlerThreadStarted = new AtomicBoolean(false);
37 private final Thread jobQueueHandlerThread;
38 private volatile boolean shutdown = false;
39 private final ListeningExecutorService syncThreadPool;
41 public NodeConfiguratorImpl() {
42 jobQueueHandlerThread = ThreadFactoryProvider.builder()
43 .namePrefix("nc-jobqueue")
46 .newThread(new JobQueueHandler());
47 final ExecutorService executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
48 .setNameFormat(NODE_EXECUTOR_PREFIX + "%d")
50 .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
52 syncThreadPool = MoreExecutors.listeningDecorator(executorService);
55 private void signalForNextJob() {
56 if (jobQueueHandlerThreadStarted.compareAndSet(false, true)) {
57 jobQueueHandlerThread.start();
62 public <T> ListenableFuture<T> enqueueJob(String key, Callable<ListenableFuture<T>> mainWorker) {
64 JobEntry jobEntry = new JobEntry(key, mainWorker);
65 JobQueue jobQueue = jobQueueMap.computeIfAbsent(key, mapKey -> new JobQueue());
66 jobQueue.addEntry(jobEntry);
69 return jobEntry.getResultFuture();
73 public void close() throws Exception {
75 LOG.info("NodeConfigurator shutting down... (tasks still running may be stopped/cancelled/interrupted)");
76 syncThreadPool.shutdownNow();
78 jobQueueHandlerThread.join(10000);
79 } catch (InterruptedException e) {
80 // Shouldn't get interrupted - either way we don't care.
83 LOG.info("NodeConfigurator now closed for business.");
87 private class JobQueueHandler implements Runnable {
89 @SuppressWarnings("checkstyle:illegalcatch")
91 LOG.info("Starting JobQueue Handler Thread");
94 for (Map.Entry<String, JobQueue> entry : jobQueueMap.entrySet()) {
98 JobQueue jobQueue = entry.getValue();
99 if (jobQueue.getExecutingEntry() != null) {
102 JobEntry jobEntry = jobQueue.poll();
103 if (jobEntry == null) {
104 // job queue is empty. so continue with next job queue entry
107 jobQueue.setExecutingEntry(jobEntry);
108 MainTask worker = new MainTask(jobEntry);
109 LOG.trace("Executing job with key: {}", jobEntry.getKey());
110 executeTask(worker) ;
112 } catch (Exception e) {
113 LOG.error("Exception while executing the tasks", e);
120 private class MainTask<T> extends LoggingUncaughtThreadDeathContextRunnable {
121 private final JobEntry jobEntry;
123 MainTask(JobEntry jobEntry) {
124 super(LOG, jobEntry::toString);
125 this.jobEntry = jobEntry;
129 @SuppressWarnings("checkstyle:illegalcatch")
130 public void runWithUncheckedExceptionLogging() {
131 @Var ListenableFuture<T> future = null;
132 LOG.trace("Running job with key: {}", jobEntry.getKey());
135 Callable<ListenableFuture<T>> mainWorker = jobEntry.getMainWorker();
136 if (mainWorker != null) {
137 future = mainWorker.call();
139 LOG.error("Unexpected no (null) main worker on job: {}", jobEntry);
142 } catch (Exception e) {
143 LOG.error("Direct Exception (not failed Future) when executing job, won't even retry: {}", jobEntry, e);
146 if (future == null) {
147 jobEntry.setResultFuture(null);
152 Futures.addCallback(future, new JobCallback(jobEntry), MoreExecutors.directExecutor());
156 private class JobCallback<T> implements FutureCallback<T> {
157 private final JobEntry jobEntry;
159 JobCallback(JobEntry jobEntry) {
160 this.jobEntry = jobEntry;
164 * This implies that all the future instances have returned success. --
168 public void onSuccess(T result) {
169 LOG.trace("Job completed successfully: {}", jobEntry.getKey());
170 jobEntry.setResultFuture(result);
175 public void onFailure(Throwable throwable) {
181 private void clearJob(JobEntry jobEntry) {
182 String jobKey = jobEntry.getKey();
183 LOG.trace("About to clear jobKey: {}", jobKey);
184 JobQueue jobQueue = jobQueueMap.get(jobKey);
185 if (jobQueue != null) {
186 jobQueue.setExecutingEntry(null);
188 LOG.error("clearJob: jobQueueMap did not contain the key for this entry: {}", jobEntry);
192 private void executeTask(Runnable task) {
194 syncThreadPool.submit(task);
195 } catch (RejectedExecutionException e) {
196 if (!syncThreadPool.isShutdown()) {
197 LOG.error("syncThreadPool task rejected", e);