Use QueuedNotificationManager to dispatch tasks 32/77732/3
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 13 Nov 2018 16:07:40 +0000 (17:07 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 13 Nov 2018 17:04:12 +0000 (18:04 +0100)
QueuedNotificationManager already provides the same execution
guarantees as NodeConfiguratorImpl while providing a non-busy-loop
dispatch and dealing with other complexities.

Use that instead of home-brewed code.

JIRA: OPNFLWPLUG-1047
Change-Id: I6ce124a007df6e1ed74a2744c5141dd6d630d18b
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/nodeconfigurator/NodeConfiguratorImpl.java

index bfcc37e9106ab34888f35f8d702d6c4b6f445701..f0e206822dd7c67d3a5941455ff8513458ad7a71 100644 (file)
@@ -10,20 +10,16 @@ package org.opendaylight.openflowplugin.applications.frm.nodeconfigurator;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.errorprone.annotations.Var;
-import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.opendaylight.infrautils.utils.concurrent.LoggingUncaughtThreadDeathContextRunnable;
 import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
-import org.opendaylight.yangtools.util.concurrent.ThreadFactoryProvider;
+import org.opendaylight.yangtools.util.concurrent.NotificationManager;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,92 +28,36 @@ public class NodeConfiguratorImpl implements NodeConfigurator {
     private static final Logger LOG = LoggerFactory.getLogger(NodeConfiguratorImpl.class);
     private static final String NODE_EXECUTOR_PREFIX = "nc-exe-";
 
-    private final Map<String, JobQueue> jobQueueMap = new ConcurrentHashMap<>();
-    private final AtomicBoolean jobQueueHandlerThreadStarted = new AtomicBoolean(false);
-    private final Thread jobQueueHandlerThread;
-    private volatile boolean shutdown = false;
-    private final ListeningExecutorService syncThreadPool;
+    private final NotificationManager<String, JobEntry<?>> manager;
+    private final ExecutorService syncThreadPool;
 
     public NodeConfiguratorImpl() {
-        jobQueueHandlerThread = ThreadFactoryProvider.builder()
-                .namePrefix("nc-jobqueue")
-                .logger(LOG)
-                .build().get()
-                .newThread(new JobQueueHandler());
-        final ExecutorService executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+        syncThreadPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
                 .setNameFormat(NODE_EXECUTOR_PREFIX + "%d")
                 .setDaemon(true)
                 .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
                 .build());
-        syncThreadPool = MoreExecutors.listeningDecorator(executorService);
-    }
-
-    private void signalForNextJob() {
-        if (jobQueueHandlerThreadStarted.compareAndSet(false, true)) {
-            jobQueueHandlerThread.start();
-        }
+        manager = QueuedNotificationManager.create(syncThreadPool, (key, entries) -> {
+            LOG.trace("Executing job with key: {}", key);
+            entries.forEach(jobEntry -> new MainTask<>(jobEntry).run());
+        }, 4096, "nc-jobqueue");
     }
 
     @Override
     public <T> ListenableFuture<T> enqueueJob(final String key, final Callable<ListenableFuture<T>> mainWorker) {
-
-        JobEntry<T> jobEntry = new JobEntry<>(key, mainWorker);
-        JobQueue jobQueue = jobQueueMap.computeIfAbsent(key, mapKey -> new JobQueue());
-        jobQueue.addEntry(jobEntry);
-        signalForNextJob();
-
+        final JobEntry<T> jobEntry = new JobEntry<>(key, mainWorker);
+        manager.submitNotification(key, jobEntry);
         return jobEntry.getResultFuture();
     }
 
     @Override
-    public void close() throws Exception {
-        {
-            LOG.info("NodeConfigurator shutting down... (tasks still running may be stopped/cancelled/interrupted)");
-            syncThreadPool.shutdownNow();
-            try {
-                jobQueueHandlerThread.join(10000);
-            } catch (InterruptedException e) {
-                // Shouldn't get interrupted - either way we don't care.
-            }
-
-            LOG.info("NodeConfigurator now closed for business.");
-        }
+    public void close() {
+        LOG.info("NodeConfigurator shutting down... (tasks still running may be stopped/cancelled/interrupted)");
+        syncThreadPool.shutdownNow();
+        LOG.info("NodeConfigurator now closed for business.");
     }
 
-    private class JobQueueHandler implements Runnable {
-        @Override
-        @SuppressWarnings("checkstyle:illegalcatch")
-        public void run() {
-            LOG.info("Starting JobQueue Handler Thread");
-            while (true) {
-                try {
-                    for (Map.Entry<String, JobQueue> entry : jobQueueMap.entrySet()) {
-                        if (shutdown) {
-                            break;
-                        }
-                        JobQueue jobQueue = entry.getValue();
-                        if (jobQueue.getExecutingEntry() != null) {
-                            continue;
-                        }
-                        JobEntry<?> jobEntry = jobQueue.poll();
-                        if (jobEntry == null) {
-                            // job queue is empty. so continue with next job queue entry
-                            continue;
-                        }
-                        jobQueue.setExecutingEntry(jobEntry);
-                        MainTask<?> worker = new MainTask<>(jobEntry);
-                        LOG.trace("Executing job with key: {}", jobEntry.getKey());
-                        executeTask(worker) ;
-                    }
-                } catch (Exception e) {
-                    LOG.error("Exception while executing the tasks", e);
-                }
-            }
-        }
-
-    }
-
-    private class MainTask<T> extends LoggingUncaughtThreadDeathContextRunnable {
+    private static final class MainTask<T> extends LoggingUncaughtThreadDeathContextRunnable {
         private final JobEntry<T> jobEntry;
 
         MainTask(final JobEntry<T> jobEntry) {
@@ -145,44 +85,20 @@ public class NodeConfiguratorImpl implements NodeConfigurator {
 
             if (future == null) {
                 jobEntry.setResultFuture(null);
-                clearJob(jobEntry);
                 return;
             }
-            clearJob(jobEntry);
             Futures.addCallback(future, new FutureCallback<T>() {
                 @Override
                 public void onSuccess(final T result) {
                     LOG.trace("Job completed successfully: {}", jobEntry.getKey());
                     jobEntry.setResultFuture(result);
-                    clearJob(jobEntry);
                 }
 
                 @Override
                 public void onFailure(final Throwable cause) {
-                    clearJob(jobEntry);
+
                 }
             }, MoreExecutors.directExecutor());
         }
     }
-
-    private void clearJob(final JobEntry<?> jobEntry) {
-        String jobKey = jobEntry.getKey();
-        LOG.trace("About to clear jobKey: {}", jobKey);
-        JobQueue jobQueue = jobQueueMap.get(jobKey);
-        if (jobQueue != null) {
-            jobQueue.setExecutingEntry(null);
-        } else {
-            LOG.error("clearJob: jobQueueMap did not contain the key for this entry: {}", jobEntry);
-        }
-    }
-
-    private void executeTask(final Runnable task) {
-        try {
-            syncThreadPool.submit(task);
-        } catch (RejectedExecutionException e) {
-            if (!syncThreadPool.isShutdown()) {
-                LOG.error("syncThreadPool task rejected", e);
-            }
-        }
-    }
 }