Bug 8531:Implementation for offerNotification method with timeout in DOMNotificationR...
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / DOMNotificationRouter.java
index 9775a512a31262e552bd6b70f31d4efa93a886d1..78ba3cc19b6b75ac1a7a40396f49d0b2654ec904 100644 (file)
@@ -16,6 +16,7 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.InsufficientCapacityException;
 import com.lmax.disruptor.PhasedBackoffWaitStrategy;
@@ -28,6 +29,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.mdsal.dom.api.DOMNotification;
 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
@@ -42,6 +45,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
  * routing of notifications from publishers to subscribers.
@@ -60,7 +64,7 @@ import org.slf4j.LoggerFactory;
  * #offerNotification(DOMNotification, long, TimeUnit)}
  * is realized by arming a background wakeup interrupt.
  */
-public final class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
+public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
         DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
 
     private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
@@ -78,11 +82,14 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati
             DOMNotificationListener>> listeners = ImmutableMultimap.of();
     private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
             ListenerRegistry.create();
+    private final ScheduledThreadPoolExecutor observer;
 
     @SuppressWarnings("unchecked")
-    private DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
+    @VisibleForTesting
+    DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
         this.executor = Preconditions.checkNotNull(executor);
-
+        this.observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
+                .setDaemon(true).setNameFormat("DOMNotificationRouter-%d").build());
         disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY,
                 queueDepth, executor, ProducerType.MULTI, strategy);
         disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
@@ -194,7 +201,8 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
+    @VisibleForTesting
+    ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
             final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
         final long seq;
         try {
@@ -225,22 +233,29 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati
         if (subscribers.isEmpty()) {
             return NO_LISTENERS;
         }
-
         // Attempt to perform a non-blocking publish first
-        final ListenableFuture<? extends Object> noBlock = tryPublish(notification, subscribers);
+        final ListenableFuture<?> noBlock = tryPublish(notification, subscribers);
         if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
             return noBlock;
         }
 
-        /*
-         * FIXME: we need a background thread, which will watch out for blocking too long. Here
-         *        we will arm a tasklet for it and synchronize delivery of interrupt properly.
-         */
-        throw new UnsupportedOperationException("Not implemented yet");
+        try {
+            final Thread publishThread = Thread.currentThread();
+            ScheduledFuture<?> timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
+            final ListenableFuture<?> withBlock = putNotification(notification);
+            timerTask.cancel(true);
+            if (observer.getQueue().size() > 50) {
+                observer.purge();
+            }
+            return withBlock;
+        } catch (InterruptedException e) {
+            return DOMNotificationPublishService.REJECTED;
+        }
     }
 
     @Override
     public void close() {
+        observer.shutdown();
         disruptor.shutdown();
         executor.shutdown();
     }