X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=dom%2Fmdsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fmdsal%2Fdom%2Fbroker%2FDOMNotificationRouter.java;h=66bccef6d1bee5e29f5de1cc63cc536fbccb10e9;hb=91749a5a5fb089e74306f288d786acb8d3c450ae;hp=c1e4ac6a9a8f7f5f672836ceb941392d05f682d5;hpb=f2910f31928b7f29c7e3593065ba35460052c38f;p=mdsal.git diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java index c1e4ac6a9a..66bccef6d1 100644 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java @@ -7,15 +7,16 @@ */ package org.opendaylight.mdsal.dom.broker; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableMultimap.Builder; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.InsufficientCapacityException; import com.lmax.disruptor.PhasedBackoffWaitStrategy; @@ -28,6 +29,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.opendaylight.mdsal.dom.api.DOMNotification; import org.opendaylight.mdsal.dom.api.DOMNotificationListener; @@ -38,6 +41,7 @@ import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistr import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.util.ListenerRegistry; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,84 +64,67 @@ import org.slf4j.LoggerFactory; * #offerNotification(DOMNotification, long, TimeUnit)} * is realized by arming a background wakeup interrupt. */ -public final class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService, +public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService, DOMNotificationService, DOMNotificationSubscriptionListenerRegistry { private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class); - private static final ListenableFuture NO_LISTENERS = Futures.immediateFuture(null); + private static final ListenableFuture NO_LISTENERS = FluentFutures.immediateNullFluentFuture(); private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock( 1L, 30L, TimeUnit.MILLISECONDS); private static final EventHandler DISPATCH_NOTIFICATIONS = - new EventHandler() { - @Override - public void onEvent(final DOMNotificationRouterEvent event, final long sequence, - final boolean endOfBatch) throws Exception { - event.deliverNotification(); - - } - }; + (event, sequence, endOfBatch) -> event.deliverNotification(); private static final EventHandler NOTIFY_FUTURE = - new EventHandler() { - @Override - public void onEvent(final DOMNotificationRouterEvent event, final long sequence, final boolean endOfBatch) { - event.setFuture(); - } - }; + (event, sequence, endOfBatch) -> event.setFuture(); - private final Disruptor disruptor; - private final ExecutorService executor; - private volatile Multimap> listeners = ImmutableMultimap.of(); private final ListenerRegistry subscriptionListeners = ListenerRegistry.create(); + private final Disruptor disruptor; + private final ScheduledThreadPoolExecutor observer; + private final ExecutorService executor; - @SuppressWarnings("unchecked") - private DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) { - this.executor = Preconditions.checkNotNull(executor); - - disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, - queueDepth, executor, ProducerType.MULTI, strategy); + private volatile Multimap> listeners = + ImmutableMultimap.of(); + + @VisibleForTesting + DOMNotificationRouter(final int queueDepth, final WaitStrategy strategy) { + observer = new ScheduledThreadPoolExecutor(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DOMNotificationRouter-observer-%d").build()); + executor = Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DOMNotificationRouter-listeners-%d").build()); + disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, + new ThreadFactoryBuilder().setNameFormat("DOMNotificationRouter-disruptor-%d").build(), + ProducerType.MULTI, strategy); disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS); disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE); disruptor.start(); } public static DOMNotificationRouter create(final int queueDepth) { - final ExecutorService executor = Executors.newCachedThreadPool(); - - return new DOMNotificationRouter(executor, queueDepth, DEFAULT_STRATEGY); + return new DOMNotificationRouter(queueDepth, DEFAULT_STRATEGY); } - public static DOMNotificationRouter create(final int queueDepth, final long spinTime, - final long parkTime, final TimeUnit unit) { - final ExecutorService executor = Executors.newCachedThreadPool(); - final WaitStrategy strategy = PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit); - - return new DOMNotificationRouter(executor, queueDepth, strategy); + public static DOMNotificationRouter create(final int queueDepth, final long spinTime, final long parkTime, + final TimeUnit unit) { + checkArgument(Long.lowestOneBit(queueDepth) == Long.highestOneBit(queueDepth), + "Queue depth %s is not power-of-two", queueDepth); + return new DOMNotificationRouter(queueDepth, PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit)); } @Override public synchronized ListenerRegistration registerNotificationListener( final T listener, final Collection types) { - final ListenerRegistration reg = new AbstractListenerRegistration(listener) { + final AbstractListenerRegistration reg = new AbstractListenerRegistration<>(listener) { @Override protected void removeRegistration() { - final ListenerRegistration me = this; - synchronized (DOMNotificationRouter.this) { replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, - new Predicate>() { - @Override - public boolean apply(final ListenerRegistration input) { - return input != me; - } - }))); + input -> input != this))); } } }; if (!types.isEmpty()) { - final Builder> b = + final Builder> b = ImmutableMultimap.builder(); b.putAll(listeners); @@ -163,25 +150,21 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati * @param newListeners is used to notify listenerTypes changed */ private void replaceListeners( - final Multimap> newListeners) { + final Multimap> newListeners) { listeners = newListeners; notifyListenerTypesChanged(newListeners.keySet()); } @SuppressWarnings("checkstyle:IllegalCatch") private void notifyListenerTypesChanged(final Set typesAfter) { - final List> listenersAfter = - ImmutableList.copyOf(subscriptionListeners.getListeners()); - executor.submit(new Runnable() { - - @Override - public void run() { - for (final ListenerRegistration subListener : listenersAfter) { - try { - subListener.getInstance().onSubscriptionChanged(typesAfter); - } catch (final Exception e) { - LOG.warn("Uncaught exception during invoking listener {}", subListener.getInstance(), e); - } + final List listenersAfter = + subscriptionListeners.streamListeners().collect(ImmutableList.toImmutableList()); + executor.execute(() -> { + for (final DOMNotificationSubscriptionListener subListener : listenersAfter) { + try { + subListener.onSubscriptionChanged(typesAfter); + } catch (final Exception e) { + LOG.warn("Uncaught exception during invoking listener {}", subListener, e); } } }); @@ -191,18 +174,12 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati public ListenerRegistration registerSubscriptionListener( final L listener) { final Set initialTypes = listeners.keySet(); - executor.submit(new Runnable() { - - @Override - public void run() { - listener.onSubscriptionChanged(initialTypes); - } - }); - return subscriptionListeners.registerWithType(listener); + executor.execute(() -> listener.onSubscriptionChanged(initialTypes)); + return subscriptionListeners.register(listener); } private ListenableFuture publish(final long seq, final DOMNotification notification, - final Collection> subscribers) { + final Collection> subscribers) { final DOMNotificationRouterEvent event = disruptor.get(seq); final ListenableFuture future = event.initialize(notification, subscribers); disruptor.getRingBuffer().publish(seq); @@ -212,7 +189,7 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati @Override public ListenableFuture putNotification(final DOMNotification notification) throws InterruptedException { - final Collection> subscribers = + final Collection> subscribers = listeners.get(notification.getType()); if (subscribers.isEmpty()) { return NO_LISTENERS; @@ -223,8 +200,9 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati } @SuppressWarnings("checkstyle:IllegalCatch") - private ListenableFuture tryPublish(final DOMNotification notification, - final Collection> subscribers) { + @VisibleForTesting + ListenableFuture tryPublish(final DOMNotification notification, + final Collection> subscribers) { final long seq; try { seq = disruptor.getRingBuffer().tryNext(); @@ -237,7 +215,7 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati @Override public ListenableFuture offerNotification(final DOMNotification notification) { - final Collection> subscribers = + final Collection> subscribers = listeners.get(notification.getType()); if (subscribers.isEmpty()) { return NO_LISTENERS; @@ -249,28 +227,55 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati @Override public ListenableFuture offerNotification(final DOMNotification notification, final long timeout, final TimeUnit unit) throws InterruptedException { - final Collection> subscribers = + final Collection> subscribers = listeners.get(notification.getType()); if (subscribers.isEmpty()) { return NO_LISTENERS; } - // Attempt to perform a non-blocking publish first - final ListenableFuture noBlock = tryPublish(notification, subscribers); + final ListenableFuture noBlock = tryPublish(notification, subscribers); if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) { return noBlock; } - /* - * FIXME: we need a background thread, which will watch out for blocking too long. Here - * we will arm a tasklet for it and synchronize delivery of interrupt properly. - */ - throw new UnsupportedOperationException("Not implemented yet"); + try { + final Thread publishThread = Thread.currentThread(); + ScheduledFuture timerTask = observer.schedule(publishThread::interrupt, timeout, unit); + final ListenableFuture withBlock = putNotification(notification); + timerTask.cancel(true); + if (observer.getQueue().size() > 50) { + observer.purge(); + } + return withBlock; + } catch (InterruptedException e) { + return DOMNotificationPublishService.REJECTED; + } } @Override public void close() { disruptor.shutdown(); + observer.shutdown(); executor.shutdown(); } + + @VisibleForTesting + ExecutorService executor() { + return executor; + } + + @VisibleForTesting + ExecutorService observer() { + return observer; + } + + @VisibleForTesting + Multimap listeners() { + return listeners; + } + + @VisibleForTesting + ListenerRegistry subscriptionListeners() { + return subscriptionListeners; + } }