Merge "improve fixed incorrect Future usage in (Bundle)FlowForwarder"
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / nodeconfigurator / NodeConfiguratorImpl.java
1 /*
2  * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.frm.nodeconfigurator;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import com.google.errorprone.annotations.Var;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Executors;
19 import org.opendaylight.infrautils.utils.concurrent.LoggingUncaughtThreadDeathContextRunnable;
20 import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
21 import org.opendaylight.yangtools.util.concurrent.NotificationManager;
22 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 public class NodeConfiguratorImpl implements NodeConfigurator {
27
28     private static final Logger LOG = LoggerFactory.getLogger(NodeConfiguratorImpl.class);
29     private static final String NODE_EXECUTOR_PREFIX = "nc-exe-";
30
31     private final NotificationManager<String, JobEntry<?>> manager;
32     private final ExecutorService syncThreadPool;
33
34     public NodeConfiguratorImpl() {
35         syncThreadPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
36                 .setNameFormat(NODE_EXECUTOR_PREFIX + "%d")
37                 .setDaemon(true)
38                 .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
39                 .build());
40         manager = QueuedNotificationManager.create(syncThreadPool, (key, entries) -> {
41             LOG.trace("Executing job with key: {}", key);
42             entries.forEach(jobEntry -> new MainTask<>(jobEntry).run());
43         }, 4096, "nc-jobqueue");
44     }
45
46     @Override
47     public <T> ListenableFuture<T> enqueueJob(final String key, final Callable<ListenableFuture<T>> mainWorker) {
48         final JobEntry<T> jobEntry = new JobEntry<>(key, mainWorker);
49         manager.submitNotification(key, jobEntry);
50         return jobEntry.getResultFuture();
51     }
52
53     @Override
54     public void close() {
55         LOG.info("NodeConfigurator shutting down... (tasks still running may be stopped/cancelled/interrupted)");
56         syncThreadPool.shutdownNow();
57         LOG.info("NodeConfigurator now closed for business.");
58     }
59
60     private static final class MainTask<T> extends LoggingUncaughtThreadDeathContextRunnable {
61         private final JobEntry<T> jobEntry;
62
63         MainTask(final JobEntry<T> jobEntry) {
64             super(LOG, jobEntry::toString);
65             this.jobEntry = jobEntry;
66         }
67
68         @Override
69         @SuppressWarnings("checkstyle:illegalcatch")
70         public void runWithUncheckedExceptionLogging() {
71             @Var ListenableFuture<T> future = null;
72             LOG.trace("Running job with key: {}", jobEntry.getKey());
73
74             try {
75                 Callable<ListenableFuture<T>> mainWorker = jobEntry.getMainWorker();
76                 if (mainWorker != null) {
77                     future = mainWorker.call();
78                 } else {
79                     LOG.error("Unexpected no (null) main worker on job: {}", jobEntry);
80                 }
81
82             } catch (Exception e) {
83                 LOG.error("Direct Exception (not failed Future) when executing job, won't even retry: {}", jobEntry, e);
84             }
85
86             if (future == null) {
87                 jobEntry.setResultFuture(null);
88                 return;
89             }
90             Futures.addCallback(future, new FutureCallback<T>() {
91                 @Override
92                 public void onSuccess(final T result) {
93                     LOG.trace("Job completed successfully: {}", jobEntry.getKey());
94                     jobEntry.setResultFuture(result);
95                 }
96
97                 @Override
98                 public void onFailure(final Throwable cause) {
99                     LOG.error("Job {} failed", jobEntry.getKey(), cause);
100                 }
101             }, MoreExecutors.directExecutor());
102         }
103     }
104 }