Fix unsubscribe checks in DOMNotificationRouterEvent
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / DOMNotificationRouter.java
index 78ba3cc19b6b75ac1a7a40396f49d0b2654ec904..66bccef6d1bee5e29f5de1cc63cc536fbccb10e9 100644 (file)
@@ -7,14 +7,14 @@
  */
 package org.opendaylight.mdsal.dom.broker;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.ImmutableMultimap.Builder;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.lmax.disruptor.EventHandler;
@@ -41,11 +41,11 @@ import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistr
 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
  * routing of notifications from publishers to subscribers.
@@ -68,7 +68,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
         DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
 
     private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
-    private static final ListenableFuture<Void> NO_LISTENERS = Futures.immediateFuture(null);
+    private static final ListenableFuture<Void> NO_LISTENERS = FluentFutures.immediateNullFluentFuture();
     private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock(
             1L, 30L, TimeUnit.MILLISECONDS);
     private static final EventHandler<DOMNotificationRouterEvent> DISPATCH_NOTIFICATIONS =
@@ -76,45 +76,44 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
     private static final EventHandler<DOMNotificationRouterEvent> NOTIFY_FUTURE =
         (event, sequence, endOfBatch) -> event.setFuture();
 
-    private final Disruptor<DOMNotificationRouterEvent> disruptor;
-    private final ExecutorService executor;
-    private volatile Multimap<SchemaPath, ListenerRegistration<? extends
-            DOMNotificationListener>> listeners = ImmutableMultimap.of();
     private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
             ListenerRegistry.create();
+    private final Disruptor<DOMNotificationRouterEvent> disruptor;
     private final ScheduledThreadPoolExecutor observer;
+    private final ExecutorService executor;
+
+    private volatile Multimap<SchemaPath, AbstractListenerRegistration<? extends DOMNotificationListener>> listeners =
+            ImmutableMultimap.of();
 
-    @SuppressWarnings("unchecked")
     @VisibleForTesting
-    DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
-        this.executor = Preconditions.checkNotNull(executor);
-        this.observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
-                .setDaemon(true).setNameFormat("DOMNotificationRouter-%d").build());
-        disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY,
-                queueDepth, executor, ProducerType.MULTI, strategy);
+    DOMNotificationRouter(final int queueDepth, final WaitStrategy strategy) {
+        observer = new ScheduledThreadPoolExecutor(1,
+            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DOMNotificationRouter-observer-%d").build());
+        executor = Executors.newCachedThreadPool(
+            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DOMNotificationRouter-listeners-%d").build());
+        disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth,
+                new ThreadFactoryBuilder().setNameFormat("DOMNotificationRouter-disruptor-%d").build(),
+                ProducerType.MULTI, strategy);
         disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
         disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE);
         disruptor.start();
     }
 
     public static DOMNotificationRouter create(final int queueDepth) {
-        final ExecutorService executor = Executors.newCachedThreadPool();
-
-        return new DOMNotificationRouter(executor, queueDepth, DEFAULT_STRATEGY);
+        return new DOMNotificationRouter(queueDepth, DEFAULT_STRATEGY);
     }
 
-    public static DOMNotificationRouter create(final int queueDepth, final long spinTime,
-            final long parkTime, final TimeUnit unit) {
-        final ExecutorService executor = Executors.newCachedThreadPool();
-        final WaitStrategy strategy = PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit);
-
-        return new DOMNotificationRouter(executor, queueDepth, strategy);
+    public static DOMNotificationRouter create(final int queueDepth, final long spinTime, final long parkTime,
+            final TimeUnit unit) {
+        checkArgument(Long.lowestOneBit(queueDepth) == Long.highestOneBit(queueDepth),
+                "Queue depth %s is not power-of-two", queueDepth);
+        return new DOMNotificationRouter(queueDepth, PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit));
     }
 
     @Override
     public synchronized <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(
             final T listener, final Collection<SchemaPath> types) {
-        final ListenerRegistration<T> reg = new AbstractListenerRegistration<T>(listener) {
+        final AbstractListenerRegistration<T> reg = new AbstractListenerRegistration<>(listener) {
             @Override
             protected void removeRegistration() {
                 synchronized (DOMNotificationRouter.this) {
@@ -125,7 +124,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
         };
 
         if (!types.isEmpty()) {
-            final Builder<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> b =
+            final Builder<SchemaPath, AbstractListenerRegistration<? extends DOMNotificationListener>> b =
                     ImmutableMultimap.builder();
             b.putAll(listeners);
 
@@ -151,21 +150,21 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
      * @param newListeners is used to notify listenerTypes changed
      */
     private void replaceListeners(
-            final Multimap<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> newListeners) {
+            final Multimap<SchemaPath, AbstractListenerRegistration<? extends DOMNotificationListener>> newListeners) {
         listeners = newListeners;
         notifyListenerTypesChanged(newListeners.keySet());
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void notifyListenerTypesChanged(final Set<SchemaPath> typesAfter) {
-        final List<ListenerRegistration<DOMNotificationSubscriptionListener>> listenersAfter =
-                ImmutableList.copyOf(subscriptionListeners.getListeners());
-        executor.submit(() -> {
-            for (final ListenerRegistration<DOMNotificationSubscriptionListener> subListener : listenersAfter) {
+        final List<? extends DOMNotificationSubscriptionListener> listenersAfter =
+                subscriptionListeners.streamListeners().collect(ImmutableList.toImmutableList());
+        executor.execute(() -> {
+            for (final DOMNotificationSubscriptionListener subListener : listenersAfter) {
                 try {
-                    subListener.getInstance().onSubscriptionChanged(typesAfter);
+                    subListener.onSubscriptionChanged(typesAfter);
                 } catch (final Exception e) {
-                    LOG.warn("Uncaught exception during invoking listener {}", subListener.getInstance(), e);
+                    LOG.warn("Uncaught exception during invoking listener {}", subListener, e);
                 }
             }
         });
@@ -175,12 +174,12 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
     public <L extends DOMNotificationSubscriptionListener> ListenerRegistration<L> registerSubscriptionListener(
             final L listener) {
         final Set<SchemaPath> initialTypes = listeners.keySet();
-        executor.submit(() -> listener.onSubscriptionChanged(initialTypes));
-        return subscriptionListeners.registerWithType(listener);
+        executor.execute(() -> listener.onSubscriptionChanged(initialTypes));
+        return subscriptionListeners.register(listener);
     }
 
     private ListenableFuture<Void> publish(final long seq, final DOMNotification notification,
-            final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+            final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers) {
         final DOMNotificationRouterEvent event = disruptor.get(seq);
         final ListenableFuture<Void> future = event.initialize(notification, subscribers);
         disruptor.getRingBuffer().publish(seq);
@@ -190,7 +189,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
     @Override
     public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
             throws InterruptedException {
-        final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers =
+        final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
                 listeners.get(notification.getType());
         if (subscribers.isEmpty()) {
             return NO_LISTENERS;
@@ -203,7 +202,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
     @SuppressWarnings("checkstyle:IllegalCatch")
     @VisibleForTesting
     ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
-            final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+            final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers) {
         final long seq;
         try {
             seq = disruptor.getRingBuffer().tryNext();
@@ -216,7 +215,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
 
     @Override
     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
-        final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers =
+        final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
                 listeners.get(notification.getType());
         if (subscribers.isEmpty()) {
             return NO_LISTENERS;
@@ -228,7 +227,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
     @Override
     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
             final TimeUnit unit) throws InterruptedException {
-        final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers =
+        final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
                 listeners.get(notification.getType());
         if (subscribers.isEmpty()) {
             return NO_LISTENERS;
@@ -255,8 +254,8 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
 
     @Override
     public void close() {
-        observer.shutdown();
         disruptor.shutdown();
+        observer.shutdown();
         executor.shutdown();
     }
 
@@ -265,6 +264,11 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
         return executor;
     }
 
+    @VisibleForTesting
+    ExecutorService observer() {
+        return observer;
+    }
+
     @VisibleForTesting
     Multimap<SchemaPath, ?> listeners() {
         return listeners;
@@ -274,5 +278,4 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
     ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners() {
         return subscriptionListeners;
     }
-
 }