d2d384442190934a6d35e9760143985cfe7b2873
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / nodeconfigurator / NodeConfiguratorImpl.java
1 /*
2  * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.frm.nodeconfigurator;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import com.google.errorprone.annotations.Var;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Executors;
19 import org.opendaylight.infrautils.utils.concurrent.LoggingUncaughtThreadDeathContextRunnable;
20 import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
21 import org.opendaylight.yangtools.util.concurrent.NotificationManager;
22 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 public class NodeConfiguratorImpl implements NodeConfigurator {
27
28     private static final Logger LOG = LoggerFactory.getLogger(NodeConfiguratorImpl.class);
29     private static final String NODE_EXECUTOR_PREFIX = "nc-exe-";
30
31     private final NotificationManager<String, JobEntry<?>> manager;
32     private final ExecutorService syncThreadPool;
33
34     public NodeConfiguratorImpl() {
35         syncThreadPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
36                 .setNameFormat(NODE_EXECUTOR_PREFIX + "%d")
37                 .setDaemon(true)
38                 .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
39                 .build());
40         manager = QueuedNotificationManager.create(syncThreadPool, (key, entries) -> {
41             LOG.trace("Executing jobs with key: {}", key);
42             entries.forEach(jobEntry -> new MainTask<>(jobEntry).run());
43             LOG.trace("Finished executing jobs with key: {}", key);
44         }, 4096, "nc-jobqueue");
45     }
46
47     @Override
48     public <T> ListenableFuture<T> enqueueJob(final String key, final Callable<ListenableFuture<T>> mainWorker) {
49         final JobEntry<T> jobEntry = new JobEntry<>(key, mainWorker);
50         LOG.trace("Enqueueing job {} with key: {}", jobEntry, key);
51         manager.submitNotification(key, jobEntry);
52         return jobEntry.getResultFuture();
53     }
54
55     @Override
56     public void close() {
57         LOG.info("NodeConfigurator shutting down... (tasks still running may be stopped/cancelled/interrupted)");
58         syncThreadPool.shutdownNow();
59         LOG.info("NodeConfigurator now closed for business.");
60     }
61
62     private static final class MainTask<T> extends LoggingUncaughtThreadDeathContextRunnable {
63         private final JobEntry<T> jobEntry;
64
65         MainTask(final JobEntry<T> jobEntry) {
66             super(LOG, jobEntry::toString);
67             this.jobEntry = jobEntry;
68         }
69
70         @Override
71         @SuppressWarnings("checkstyle:illegalcatch")
72         public void runWithUncheckedExceptionLogging() {
73             @Var ListenableFuture<T> future = null;
74             LOG.trace("Running job with key: {}", jobEntry.getKey());
75
76             try {
77                 Callable<ListenableFuture<T>> mainWorker = jobEntry.getMainWorker();
78                 if (mainWorker != null) {
79                     future = mainWorker.call();
80                 } else {
81                     LOG.error("Unexpected no (null) main worker on job: {}", jobEntry);
82                 }
83
84             } catch (Exception e) {
85                 LOG.error("Direct Exception (not failed Future) when executing job, won't even retry: {}", jobEntry, e);
86             }
87
88             if (future != null) {
89                 Futures.addCallback(future, new FutureCallback<T>() {
90                     @Override
91                     public void onSuccess(final T result) {
92                         LOG.trace("Job completed successfully: {}", jobEntry.getKey());
93                         jobEntry.setResultFuture(result);
94                     }
95
96                     @Override
97                     public void onFailure(final Throwable cause) {
98                         LOG.error("Job {} failed", jobEntry.getKey(), cause);
99                     }
100                 }, MoreExecutors.directExecutor());
101             } else {
102                 jobEntry.setResultFuture(null);
103             }
104
105             LOG.trace("Finished running job with key: {}", jobEntry.getKey());
106         }
107     }
108 }