X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=dom%2Fmdsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fmdsal%2Fdom%2Fbroker%2FDOMNotificationRouter.java;h=ef4db233e4f2a02921fca60ac7d0be048f8e5733;hb=8ef9ac5c2b42d32c8cea565f3494a3aa0e6e109b;hp=6d524a1ab49aa6c237413959a9342b447928d9f4;hpb=6dc0dbb76325cf16bc6980fe474eba00370a5703;p=mdsal.git diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java index 6d524a1ab4..ef4db233e4 100644 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java @@ -7,22 +7,17 @@ */ package org.opendaylight.mdsal.dom.broker; -import static com.google.common.base.Preconditions.checkArgument; - import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableMultimap.Builder; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.InsufficientCapacityException; -import com.lmax.disruptor.PhasedBackoffWaitStrategy; -import com.lmax.disruptor.WaitStrategy; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; @@ -40,7 +35,9 @@ import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistr import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.util.ListenerRegistry; +import org.opendaylight.yangtools.util.concurrent.EqualityQueuedNotificationManager; import org.opendaylight.yangtools.util.concurrent.FluentFutures; +import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager; import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,34 +47,19 @@ import org.slf4j.LoggerFactory; * routing of notifications from publishers to subscribers. * *

- * Internal implementation works by allocating a two-handler Disruptor. The first handler delivers notifications - * to subscribed listeners and the second one notifies whoever may be listening on the returned future. Registration - * state tracking is performed by a simple immutable multimap -- when a registration or unregistration occurs we - * re-generate the entire map from scratch and set it atomically. While registrations/unregistrations synchronize - * on this instance, notifications do not take any locks here. - * - *

- * The fully-blocking {@link #publish(long, DOMNotification, Collection)} - * and non-blocking {@link #offerNotification(DOMNotification)} - * are realized using the Disruptor's native operations. The bounded-blocking {@link - * #offerNotification(DOMNotification, long, TimeUnit)} - * is realized by arming a background wakeup interrupt. + * Internal implementation one by using a {@link QueuedNotificationManager}. + *

*/ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService, DOMNotificationService, DOMNotificationSubscriptionListenerRegistry { private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class); private static final ListenableFuture NO_LISTENERS = FluentFutures.immediateNullFluentFuture(); - private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock( - 1L, 30L, TimeUnit.MILLISECONDS); - private static final EventHandler DISPATCH_NOTIFICATIONS = - (event, sequence, endOfBatch) -> event.deliverNotification(); - private static final EventHandler NOTIFY_FUTURE = - (event, sequence, endOfBatch) -> event.setFuture(); private final ListenerRegistry subscriptionListeners = ListenerRegistry.create(); - private final Disruptor disruptor; + private final EqualityQueuedNotificationManager, + DOMNotificationRouterEvent> queueNotificationManager; private final ScheduledThreadPoolExecutor observer; private final ExecutorService executor; @@ -85,28 +67,17 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl ImmutableMultimap.of(); @VisibleForTesting - DOMNotificationRouter(final int queueDepth, final WaitStrategy strategy) { + DOMNotificationRouter(int maxQueueCapacity) { observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DOMNotificationRouter-observer-%d").build()); executor = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DOMNotificationRouter-listeners-%d").build()); - disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, - new ThreadFactoryBuilder().setNameFormat("DOMNotificationRouter-disruptor-%d").build(), - ProducerType.MULTI, strategy); - disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS); - disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE); - disruptor.start(); + queueNotificationManager = new EqualityQueuedNotificationManager<>("DOMNotificationRouter", executor, + maxQueueCapacity, DOMNotificationRouter::deliverEvents); } - public static DOMNotificationRouter create(final int queueDepth) { - return new DOMNotificationRouter(queueDepth, DEFAULT_STRATEGY); - } - - public static DOMNotificationRouter create(final int queueDepth, final long spinTime, final long parkTime, - final TimeUnit unit) { - checkArgument(Long.lowestOneBit(queueDepth) == Long.highestOneBit(queueDepth), - "Queue depth %s is not power-of-two", queueDepth); - return new DOMNotificationRouter(queueDepth, PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit)); + public static DOMNotificationRouter create(int maxQueueCapacity) { + return new DOMNotificationRouter(maxQueueCapacity); } @Override @@ -171,12 +142,18 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl return subscriptionListeners.register(listener); } - private ListenableFuture publish(final long seq, final DOMNotification notification, + + @VisibleForTesting + ListenableFuture publish(DOMNotification notification, final Collection> subscribers) { - final DOMNotificationRouterEvent event = disruptor.get(seq); - final ListenableFuture future = event.initialize(notification, subscribers); - disruptor.getRingBuffer().publish(seq); - return future; + final List> futures = new ArrayList<>(subscribers.size()); + subscribers.forEach(subscriber -> { + final DOMNotificationRouterEvent event = new DOMNotificationRouterEvent(notification); + futures.add(event.future()); + queueNotificationManager.submitNotification(subscriber, event); + }); + return Futures.transform(Futures.successfulAsList(futures), ignored -> (Void)null, + MoreExecutors.directExecutor()); } @Override @@ -188,22 +165,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl return NO_LISTENERS; } - final long seq = disruptor.getRingBuffer().next(); - return publish(seq, notification, subscribers); - } - - @SuppressWarnings("checkstyle:IllegalCatch") - @VisibleForTesting - ListenableFuture tryPublish(final DOMNotification notification, - final Collection> subscribers) { - final long seq; - try { - seq = disruptor.getRingBuffer().tryNext(); - } catch (final InsufficientCapacityException e) { - return DOMNotificationPublishService.REJECTED; - } - - return publish(seq, notification, subscribers); + return publish(notification, subscribers); } @Override @@ -214,7 +176,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl return NO_LISTENERS; } - return tryPublish(notification, subscribers); + return publish(notification, subscribers); } @Override @@ -226,7 +188,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl return NO_LISTENERS; } // Attempt to perform a non-blocking publish first - final ListenableFuture noBlock = tryPublish(notification, subscribers); + final ListenableFuture noBlock = publish(notification, subscribers); if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) { return noBlock; } @@ -247,7 +209,6 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl @Override public void close() { - disruptor.shutdown(); observer.shutdown(); executor.shutdown(); } @@ -271,4 +232,16 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl ListenerRegistry subscriptionListeners() { return subscriptionListeners; } + + private static void deliverEvents(final AbstractListenerRegistration reg, + final ImmutableList events) { + if (reg.notClosed()) { + final DOMNotificationListener listener = reg.getInstance(); + for (DOMNotificationRouterEvent event : events) { + event.deliverTo(listener); + } + } else { + events.forEach(DOMNotificationRouterEvent::clear); + } + } }