X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=dom%2Fmdsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fmdsal%2Fdom%2Fbroker%2FDOMNotificationRouter.java;h=66bccef6d1bee5e29f5de1cc63cc536fbccb10e9;hb=91749a5a5fb089e74306f288d786acb8d3c450ae;hp=e8a307c95841474ed3afdf7e86cd195a40981475;hpb=0f351bbc28ddf2cddfe30c8d018646d81953fa17;p=mdsal.git diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java index e8a307c958..66bccef6d1 100644 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java @@ -7,14 +7,14 @@ */ package org.opendaylight.mdsal.dom.broker; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableMultimap.Builder; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.lmax.disruptor.EventHandler; @@ -41,11 +41,11 @@ import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistr import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.util.ListenerRegistry; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides * routing of notifications from publishers to subscribers. @@ -68,7 +68,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl DOMNotificationService, DOMNotificationSubscriptionListenerRegistry { private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class); - private static final ListenableFuture NO_LISTENERS = Futures.immediateFuture(null); + private static final ListenableFuture NO_LISTENERS = FluentFutures.immediateNullFluentFuture(); private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock( 1L, 30L, TimeUnit.MILLISECONDS); private static final EventHandler DISPATCH_NOTIFICATIONS = @@ -76,47 +76,44 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl private static final EventHandler NOTIFY_FUTURE = (event, sequence, endOfBatch) -> event.setFuture(); - private final Disruptor disruptor; - private final ExecutorService executor; - private volatile Multimap> listeners = ImmutableMultimap.of(); private final ListenerRegistry subscriptionListeners = ListenerRegistry.create(); + private final Disruptor disruptor; private final ScheduledThreadPoolExecutor observer; + private final ExecutorService executor; + + private volatile Multimap> listeners = + ImmutableMultimap.of(); - @SuppressWarnings("unchecked") @VisibleForTesting - DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) { - this.executor = Preconditions.checkNotNull(executor); - this.observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder() - .setDaemon(true).setNameFormat("DOMNotificationRouter-%d").build()); - disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, - queueDepth, executor, ProducerType.MULTI, strategy); + DOMNotificationRouter(final int queueDepth, final WaitStrategy strategy) { + observer = new ScheduledThreadPoolExecutor(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DOMNotificationRouter-observer-%d").build()); + executor = Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DOMNotificationRouter-listeners-%d").build()); + disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, + new ThreadFactoryBuilder().setNameFormat("DOMNotificationRouter-disruptor-%d").build(), + ProducerType.MULTI, strategy); disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS); disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE); disruptor.start(); } public static DOMNotificationRouter create(final int queueDepth) { - final ExecutorService executor = Executors.newCachedThreadPool(); - - return new DOMNotificationRouter(executor, queueDepth, DEFAULT_STRATEGY); + return new DOMNotificationRouter(queueDepth, DEFAULT_STRATEGY); } - public static DOMNotificationRouter create(final int queueDepth, final long spinTime, - final long parkTime, final TimeUnit unit) { - Preconditions.checkArgument(Long.lowestOneBit(queueDepth) == Long.highestOneBit(queueDepth), + public static DOMNotificationRouter create(final int queueDepth, final long spinTime, final long parkTime, + final TimeUnit unit) { + checkArgument(Long.lowestOneBit(queueDepth) == Long.highestOneBit(queueDepth), "Queue depth %s is not power-of-two", queueDepth); - final ExecutorService executor = Executors.newCachedThreadPool(); - final WaitStrategy strategy = PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit); - - return new DOMNotificationRouter(executor, queueDepth, strategy); + return new DOMNotificationRouter(queueDepth, PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit)); } @Override public synchronized ListenerRegistration registerNotificationListener( final T listener, final Collection types) { - final ListenerRegistration reg = new AbstractListenerRegistration(listener) { + final AbstractListenerRegistration reg = new AbstractListenerRegistration<>(listener) { @Override protected void removeRegistration() { synchronized (DOMNotificationRouter.this) { @@ -127,7 +124,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl }; if (!types.isEmpty()) { - final Builder> b = + final Builder> b = ImmutableMultimap.builder(); b.putAll(listeners); @@ -153,21 +150,21 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl * @param newListeners is used to notify listenerTypes changed */ private void replaceListeners( - final Multimap> newListeners) { + final Multimap> newListeners) { listeners = newListeners; notifyListenerTypesChanged(newListeners.keySet()); } @SuppressWarnings("checkstyle:IllegalCatch") private void notifyListenerTypesChanged(final Set typesAfter) { - final List> listenersAfter = - ImmutableList.copyOf(subscriptionListeners.getListeners()); - executor.submit(() -> { - for (final ListenerRegistration subListener : listenersAfter) { + final List listenersAfter = + subscriptionListeners.streamListeners().collect(ImmutableList.toImmutableList()); + executor.execute(() -> { + for (final DOMNotificationSubscriptionListener subListener : listenersAfter) { try { - subListener.getInstance().onSubscriptionChanged(typesAfter); + subListener.onSubscriptionChanged(typesAfter); } catch (final Exception e) { - LOG.warn("Uncaught exception during invoking listener {}", subListener.getInstance(), e); + LOG.warn("Uncaught exception during invoking listener {}", subListener, e); } } }); @@ -177,12 +174,12 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl public ListenerRegistration registerSubscriptionListener( final L listener) { final Set initialTypes = listeners.keySet(); - executor.submit(() -> listener.onSubscriptionChanged(initialTypes)); - return subscriptionListeners.registerWithType(listener); + executor.execute(() -> listener.onSubscriptionChanged(initialTypes)); + return subscriptionListeners.register(listener); } private ListenableFuture publish(final long seq, final DOMNotification notification, - final Collection> subscribers) { + final Collection> subscribers) { final DOMNotificationRouterEvent event = disruptor.get(seq); final ListenableFuture future = event.initialize(notification, subscribers); disruptor.getRingBuffer().publish(seq); @@ -192,7 +189,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl @Override public ListenableFuture putNotification(final DOMNotification notification) throws InterruptedException { - final Collection> subscribers = + final Collection> subscribers = listeners.get(notification.getType()); if (subscribers.isEmpty()) { return NO_LISTENERS; @@ -205,7 +202,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl @SuppressWarnings("checkstyle:IllegalCatch") @VisibleForTesting ListenableFuture tryPublish(final DOMNotification notification, - final Collection> subscribers) { + final Collection> subscribers) { final long seq; try { seq = disruptor.getRingBuffer().tryNext(); @@ -218,7 +215,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl @Override public ListenableFuture offerNotification(final DOMNotification notification) { - final Collection> subscribers = + final Collection> subscribers = listeners.get(notification.getType()); if (subscribers.isEmpty()) { return NO_LISTENERS; @@ -230,7 +227,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl @Override public ListenableFuture offerNotification(final DOMNotification notification, final long timeout, final TimeUnit unit) throws InterruptedException { - final Collection> subscribers = + final Collection> subscribers = listeners.get(notification.getType()); if (subscribers.isEmpty()) { return NO_LISTENERS; @@ -257,8 +254,8 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl @Override public void close() { - observer.shutdown(); disruptor.shutdown(); + observer.shutdown(); executor.shutdown(); } @@ -267,6 +264,11 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl return executor; } + @VisibleForTesting + ExecutorService observer() { + return observer; + } + @VisibleForTesting Multimap listeners() { return listeners; @@ -276,5 +278,4 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl ListenerRegistry subscriptionListeners() { return subscriptionListeners; } - }