import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.errorprone.annotations.Var;
-import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.opendaylight.infrautils.utils.concurrent.LoggingUncaughtThreadDeathContextRunnable;
import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
-import org.opendaylight.yangtools.util.concurrent.ThreadFactoryProvider;
+import org.opendaylight.yangtools.util.concurrent.NotificationManager;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(NodeConfiguratorImpl.class);
private static final String NODE_EXECUTOR_PREFIX = "nc-exe-";
- private final Map<String, JobQueue> jobQueueMap = new ConcurrentHashMap<>();
- private final AtomicBoolean jobQueueHandlerThreadStarted = new AtomicBoolean(false);
- private final Thread jobQueueHandlerThread;
- private volatile boolean shutdown = false;
- private final ListeningExecutorService syncThreadPool;
+ private final NotificationManager<String, JobEntry<?>> manager;
+ private final ExecutorService syncThreadPool;
public NodeConfiguratorImpl() {
- jobQueueHandlerThread = ThreadFactoryProvider.builder()
- .namePrefix("nc-jobqueue")
- .logger(LOG)
- .build().get()
- .newThread(new JobQueueHandler());
- final ExecutorService executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+ syncThreadPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setNameFormat(NODE_EXECUTOR_PREFIX + "%d")
.setDaemon(true)
.setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
.build());
- syncThreadPool = MoreExecutors.listeningDecorator(executorService);
- }
-
- private void signalForNextJob() {
- if (jobQueueHandlerThreadStarted.compareAndSet(false, true)) {
- jobQueueHandlerThread.start();
- }
+ manager = QueuedNotificationManager.create(syncThreadPool, (key, entries) -> {
+ LOG.trace("Executing job with key: {}", key);
+ entries.forEach(jobEntry -> new MainTask<>(jobEntry).run());
+ }, 4096, "nc-jobqueue");
}
@Override
public <T> ListenableFuture<T> enqueueJob(final String key, final Callable<ListenableFuture<T>> mainWorker) {
-
- JobEntry<T> jobEntry = new JobEntry<>(key, mainWorker);
- JobQueue jobQueue = jobQueueMap.computeIfAbsent(key, mapKey -> new JobQueue());
- jobQueue.addEntry(jobEntry);
- signalForNextJob();
-
+ final JobEntry<T> jobEntry = new JobEntry<>(key, mainWorker);
+ manager.submitNotification(key, jobEntry);
return jobEntry.getResultFuture();
}
@Override
- public void close() throws Exception {
- {
- LOG.info("NodeConfigurator shutting down... (tasks still running may be stopped/cancelled/interrupted)");
- syncThreadPool.shutdownNow();
- try {
- jobQueueHandlerThread.join(10000);
- } catch (InterruptedException e) {
- // Shouldn't get interrupted - either way we don't care.
- }
-
- LOG.info("NodeConfigurator now closed for business.");
- }
+ public void close() {
+ LOG.info("NodeConfigurator shutting down... (tasks still running may be stopped/cancelled/interrupted)");
+ syncThreadPool.shutdownNow();
+ LOG.info("NodeConfigurator now closed for business.");
}
- private class JobQueueHandler implements Runnable {
- @Override
- @SuppressWarnings("checkstyle:illegalcatch")
- public void run() {
- LOG.info("Starting JobQueue Handler Thread");
- while (true) {
- try {
- for (Map.Entry<String, JobQueue> entry : jobQueueMap.entrySet()) {
- if (shutdown) {
- break;
- }
- JobQueue jobQueue = entry.getValue();
- if (jobQueue.getExecutingEntry() != null) {
- continue;
- }
- JobEntry<?> jobEntry = jobQueue.poll();
- if (jobEntry == null) {
- // job queue is empty. so continue with next job queue entry
- continue;
- }
- jobQueue.setExecutingEntry(jobEntry);
- MainTask<?> worker = new MainTask<>(jobEntry);
- LOG.trace("Executing job with key: {}", jobEntry.getKey());
- executeTask(worker) ;
- }
- } catch (Exception e) {
- LOG.error("Exception while executing the tasks", e);
- }
- }
- }
-
- }
-
- private class MainTask<T> extends LoggingUncaughtThreadDeathContextRunnable {
+ private static final class MainTask<T> extends LoggingUncaughtThreadDeathContextRunnable {
private final JobEntry<T> jobEntry;
MainTask(final JobEntry<T> jobEntry) {
if (future == null) {
jobEntry.setResultFuture(null);
- clearJob(jobEntry);
return;
}
- clearJob(jobEntry);
Futures.addCallback(future, new FutureCallback<T>() {
@Override
public void onSuccess(final T result) {
LOG.trace("Job completed successfully: {}", jobEntry.getKey());
jobEntry.setResultFuture(result);
- clearJob(jobEntry);
}
@Override
public void onFailure(final Throwable cause) {
- clearJob(jobEntry);
+
}
}, MoreExecutors.directExecutor());
}
}
-
- private void clearJob(final JobEntry<?> jobEntry) {
- String jobKey = jobEntry.getKey();
- LOG.trace("About to clear jobKey: {}", jobKey);
- JobQueue jobQueue = jobQueueMap.get(jobKey);
- if (jobQueue != null) {
- jobQueue.setExecutingEntry(null);
- } else {
- LOG.error("clearJob: jobQueueMap did not contain the key for this entry: {}", jobEntry);
- }
- }
-
- private void executeTask(final Runnable task) {
- try {
- syncThreadPool.submit(task);
- } catch (RejectedExecutionException e) {
- if (!syncThreadPool.isShutdown()) {
- LOG.error("syncThreadPool task rejected", e);
- }
- }
- }
}