2 * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.frm.nodeconfigurator;
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import com.google.errorprone.annotations.Var;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Executors;
19 import org.opendaylight.infrautils.utils.concurrent.LoggingUncaughtThreadDeathContextRunnable;
20 import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
21 import org.opendaylight.yangtools.util.concurrent.NotificationManager;
22 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 public class NodeConfiguratorImpl implements NodeConfigurator {
28 private static final Logger LOG = LoggerFactory.getLogger(NodeConfiguratorImpl.class);
29 private static final String NODE_EXECUTOR_PREFIX = "nc-exe-";
31 private final NotificationManager<String, JobEntry<?>> manager;
32 private final ExecutorService syncThreadPool;
34 public NodeConfiguratorImpl() {
35 syncThreadPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
36 .setNameFormat(NODE_EXECUTOR_PREFIX + "%d")
38 .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
40 manager = QueuedNotificationManager.create(syncThreadPool, (key, entries) -> {
41 LOG.trace("Executing jobs with key: {}", key);
42 entries.forEach(jobEntry -> new MainTask<>(jobEntry).run());
43 LOG.trace("Finished executing jobs with key: {}", key);
44 }, 4096, "nc-jobqueue");
48 public <T> ListenableFuture<T> enqueueJob(final String key, final Callable<ListenableFuture<T>> mainWorker) {
49 final JobEntry<T> jobEntry = new JobEntry<>(key, mainWorker);
50 LOG.trace("Enqueueing job {} with key: {}", jobEntry, key);
51 manager.submitNotification(key, jobEntry);
52 return jobEntry.getResultFuture();
57 LOG.info("NodeConfigurator shutting down... (tasks still running may be stopped/cancelled/interrupted)");
58 syncThreadPool.shutdownNow();
59 LOG.info("NodeConfigurator now closed for business.");
62 private static final class MainTask<T> extends LoggingUncaughtThreadDeathContextRunnable {
63 private final JobEntry<T> jobEntry;
65 MainTask(final JobEntry<T> jobEntry) {
66 super(LOG, jobEntry::toString);
67 this.jobEntry = jobEntry;
71 @SuppressWarnings("checkstyle:illegalcatch")
72 public void runWithUncheckedExceptionLogging() {
73 @Var ListenableFuture<T> future = null;
74 LOG.trace("Running job with key: {}", jobEntry.getKey());
77 Callable<ListenableFuture<T>> mainWorker = jobEntry.getMainWorker();
78 if (mainWorker != null) {
79 future = mainWorker.call();
81 LOG.error("Unexpected no (null) main worker on job: {}", jobEntry);
84 } catch (Exception e) {
85 LOG.error("Direct Exception (not failed Future) when executing job, won't even retry: {}", jobEntry, e);
89 Futures.addCallback(future, new FutureCallback<T>() {
91 public void onSuccess(final T result) {
92 LOG.trace("Job completed successfully: {}", jobEntry.getKey());
93 jobEntry.setResultFuture(result);
97 public void onFailure(final Throwable cause) {
98 LOG.error("Job {} failed", jobEntry.getKey(), cause);
100 }, MoreExecutors.directExecutor());
102 jobEntry.setResultFuture(null);
105 LOG.trace("Finished running job with key: {}", jobEntry.getKey());