Enable mdsal-dom-broker spotbugs
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / DOMNotificationRouter.java
index 9775a512a31262e552bd6b70f31d4efa93a886d1..f84ef6699efd2c3646f6d3a279a475960e94671e 100644 (file)
@@ -16,6 +16,7 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.InsufficientCapacityException;
 import com.lmax.disruptor.PhasedBackoffWaitStrategy;
@@ -28,6 +29,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.mdsal.dom.api.DOMNotification;
 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
@@ -60,7 +63,7 @@ import org.slf4j.LoggerFactory;
  * #offerNotification(DOMNotification, long, TimeUnit)}
  * is realized by arming a background wakeup interrupt.
  */
-public final class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
+public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
         DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
 
     private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
@@ -78,11 +81,13 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati
             DOMNotificationListener>> listeners = ImmutableMultimap.of();
     private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
             ListenerRegistry.create();
+    private final ScheduledThreadPoolExecutor observer;
 
-    @SuppressWarnings("unchecked")
-    private DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
+    @VisibleForTesting
+    DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
         this.executor = Preconditions.checkNotNull(executor);
-
+        this.observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
+                .setDaemon(true).setNameFormat("DOMNotificationRouter-%d").build());
         disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY,
                 queueDepth, executor, ProducerType.MULTI, strategy);
         disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
@@ -98,6 +103,8 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati
 
     public static DOMNotificationRouter create(final int queueDepth, final long spinTime,
             final long parkTime, final TimeUnit unit) {
+        Preconditions.checkArgument(Long.lowestOneBit(queueDepth) == Long.highestOneBit(queueDepth),
+                "Queue depth %s is not power-of-two", queueDepth);
         final ExecutorService executor = Executors.newCachedThreadPool();
         final WaitStrategy strategy = PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit);
 
@@ -153,7 +160,7 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati
     private void notifyListenerTypesChanged(final Set<SchemaPath> typesAfter) {
         final List<ListenerRegistration<DOMNotificationSubscriptionListener>> listenersAfter =
                 ImmutableList.copyOf(subscriptionListeners.getListeners());
-        executor.submit(() -> {
+        executor.execute(() -> {
             for (final ListenerRegistration<DOMNotificationSubscriptionListener> subListener : listenersAfter) {
                 try {
                     subListener.getInstance().onSubscriptionChanged(typesAfter);
@@ -168,7 +175,7 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati
     public <L extends DOMNotificationSubscriptionListener> ListenerRegistration<L> registerSubscriptionListener(
             final L listener) {
         final Set<SchemaPath> initialTypes = listeners.keySet();
-        executor.submit(() -> listener.onSubscriptionChanged(initialTypes));
+        executor.execute(() -> listener.onSubscriptionChanged(initialTypes));
         return subscriptionListeners.registerWithType(listener);
     }
 
@@ -194,7 +201,8 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
+    @VisibleForTesting
+    ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
             final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
         final long seq;
         try {
@@ -225,22 +233,29 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati
         if (subscribers.isEmpty()) {
             return NO_LISTENERS;
         }
-
         // Attempt to perform a non-blocking publish first
-        final ListenableFuture<? extends Object> noBlock = tryPublish(notification, subscribers);
+        final ListenableFuture<?> noBlock = tryPublish(notification, subscribers);
         if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
             return noBlock;
         }
 
-        /*
-         * FIXME: we need a background thread, which will watch out for blocking too long. Here
-         *        we will arm a tasklet for it and synchronize delivery of interrupt properly.
-         */
-        throw new UnsupportedOperationException("Not implemented yet");
+        try {
+            final Thread publishThread = Thread.currentThread();
+            ScheduledFuture<?> timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
+            final ListenableFuture<?> withBlock = putNotification(notification);
+            timerTask.cancel(true);
+            if (observer.getQueue().size() > 50) {
+                observer.purge();
+            }
+            return withBlock;
+        } catch (InterruptedException e) {
+            return DOMNotificationPublishService.REJECTED;
+        }
     }
 
     @Override
     public void close() {
+        observer.shutdown();
         disruptor.shutdown();
         executor.shutdown();
     }