X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=dom%2Fmdsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fmdsal%2Fdom%2Fbroker%2FDOMNotificationRouter.java;h=4b3190a1da014eaf5bee51433f2021707d9ff5d5;hb=5f8a373c07549a901b70595067dd11c161d0c4e4;hp=448eb310c31cab1a143fa6c829f1cc44f2c72ece;hpb=d068a3a950baa4505e0126614137f4178592cc24;p=mdsal.git diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java index 448eb310c3..4b3190a1da 100644 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java @@ -7,39 +7,49 @@ */ package org.opendaylight.mdsal.dom.broker; -import org.opendaylight.mdsal.dom.api.DOMNotification; -import org.opendaylight.mdsal.dom.api.DOMNotificationListener; -import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService; -import org.opendaylight.mdsal.dom.api.DOMNotificationService; +import static java.util.Objects.requireNonNull; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; -import com.google.common.collect.ImmutableMultimap.Builder; -import com.google.common.collect.Multimap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimaps; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.InsufficientCapacityException; -import com.lmax.disruptor.PhasedBackoffWaitStrategy; -import com.lmax.disruptor.WaitStrategy; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; -import java.util.Arrays; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; -import java.util.Set; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.md.sal.dom.spi.DOMNotificationSubscriptionListener; -import org.opendaylight.controller.md.sal.dom.spi.DOMNotificationSubscriptionListenerRegistry; -import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.util.ListenerRegistry; -import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.dom.api.DOMNotification; +import org.opendaylight.mdsal.dom.api.DOMNotificationListener; +import org.opendaylight.mdsal.dom.api.DOMNotificationPublishDemandExtension; +import org.opendaylight.mdsal.dom.api.DOMNotificationPublishDemandExtension.DemandListener; +import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService; +import org.opendaylight.mdsal.dom.api.DOMNotificationService; +import org.opendaylight.yangtools.concepts.AbstractRegistration; +import org.opendaylight.yangtools.concepts.Registration; +import org.opendaylight.yangtools.util.ObjectRegistry; +import org.opendaylight.yangtools.util.concurrent.EqualityQueuedNotificationManager; +import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager; +import org.opendaylight.yangtools.yang.common.Empty; +import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.metatype.annotations.AttributeDefinition; +import org.osgi.service.metatype.annotations.Designate; +import org.osgi.service.metatype.annotations.ObjectClassDefinition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,206 +57,281 @@ import org.slf4j.LoggerFactory; * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides * routing of notifications from publishers to subscribers. * - * Internal implementation works by allocating a two-handler Disruptor. The first handler delivers notifications - * to subscribed listeners and the second one notifies whoever may be listening on the returned future. Registration - * state tracking is performed by a simple immutable multimap -- when a registration or unregistration occurs we - * re-generate the entire map from scratch and set it atomically. While registrations/unregistrations synchronize - * on this instance, notifications do not take any locks here. - * - * The fully-blocking {@link #publish(long, DOMNotification, Collection)} and non-blocking {@link #offerNotification(DOMNotification)} - * are realized using the Disruptor's native operations. The bounded-blocking {@link #offerNotification(DOMNotification, long, TimeUnit)} - * is realized by arming a background wakeup interrupt. + *

+ * Internal implementation one by using a {@link QueuedNotificationManager}. + *

*/ -public final class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService, - DOMNotificationService, DOMNotificationSubscriptionListenerRegistry { +@Singleton +@Component(configurationPid = "org.opendaylight.mdsal.dom.notification", service = DOMNotificationRouter.class) +@Designate(ocd = DOMNotificationRouter.Config.class) +// Non-final for testing +public class DOMNotificationRouter implements AutoCloseable { + @ObjectClassDefinition() + public @interface Config { + @AttributeDefinition(name = "notification-queue-depth") + int queueDepth() default 65536; + } - private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class); - private static final ListenableFuture NO_LISTENERS = Futures.immediateFuture(null); - private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock(1L, 30L, TimeUnit.MILLISECONDS); - private static final EventHandler DISPATCH_NOTIFICATIONS = new EventHandler() { - @Override - public void onEvent(final DOMNotificationRouterEvent event, final long sequence, final boolean endOfBatch) throws Exception { - event.deliverNotification(); + @VisibleForTesting + abstract static sealed class Reg extends AbstractRegistration { + private final @NonNull DOMNotificationListener listener; + Reg(final @NonNull DOMNotificationListener listener) { + this.listener = requireNonNull(listener); } - }; - private static final EventHandler NOTIFY_FUTURE = new EventHandler() { - @Override - public void onEvent(final DOMNotificationRouterEvent event, final long sequence, final boolean endOfBatch) { - event.setFuture(); + } + + private final class SingleReg extends Reg { + SingleReg(final @NonNull DOMNotificationListener listener) { + super(listener); } - }; - private final Disruptor disruptor; - private final ExecutorService executor; - private volatile Multimap> listeners = ImmutableMultimap.of(); - private final ListenerRegistry subscriptionListeners = ListenerRegistry.create(); + @Override + protected void removeRegistration() { + DOMNotificationRouter.this.removeRegistration(this); + } + } - @SuppressWarnings("unchecked") - private DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) { - this.executor = Preconditions.checkNotNull(executor); + private static final class ComponentReg extends Reg { + ComponentReg(final @NonNull DOMNotificationListener listener) { + super(listener); + } - disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, executor, ProducerType.MULTI, strategy); - disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS); - disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE); - disruptor.start(); + @Override + protected void removeRegistration() { + // No-op + } } - public static DOMNotificationRouter create(final int queueDepth) { - final ExecutorService executor = Executors.newCachedThreadPool(); + private final class PublishFacade implements DOMNotificationPublishService, DOMNotificationPublishDemandExtension { + @Override + public List supportedExtensions() { + return List.of(this); + } - return new DOMNotificationRouter(executor, queueDepth, DEFAULT_STRATEGY); - } + @Override + public ListenableFuture putNotification(final DOMNotification notification) + throws InterruptedException { + return putNotificationImpl(notification); + } - public static DOMNotificationRouter create(final int queueDepth, final long spinTime, final long parkTime, final TimeUnit unit) { - final ExecutorService executor = Executors.newCachedThreadPool(); - final WaitStrategy strategy = PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit); + @Override + public ListenableFuture offerNotification(final DOMNotification notification) { + final var subscribers = listeners.get(notification.getType()); + return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers); + } - return new DOMNotificationRouter(executor, queueDepth, strategy); - } + @Override + public ListenableFuture offerNotification(final DOMNotification notification, + final long timeout, final TimeUnit unit) throws InterruptedException { + final var subscribers = listeners.get(notification.getType()); + if (subscribers.isEmpty()) { + return NO_LISTENERS; + } + // Attempt to perform a non-blocking publish first + final var noBlock = publish(notification, subscribers); + if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) { + return noBlock; + } - @Override - public synchronized ListenerRegistration registerNotificationListener(final T listener, final Collection types) { - final ListenerRegistration reg = new AbstractListenerRegistration(listener) { - @Override - protected void removeRegistration() { - final ListenerRegistration me = this; - - synchronized (DOMNotificationRouter.this) { - replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, new Predicate>() { - @Override - public boolean apply(final ListenerRegistration input) { - return input != me; - } - }))); + try { + final var publishThread = Thread.currentThread(); + final var timerTask = observer.schedule(publishThread::interrupt, timeout, unit); + final var withBlock = putNotificationImpl(notification); + timerTask.cancel(true); + if (observer.getQueue().size() > 50) { + observer.purge(); } + return withBlock; + } catch (InterruptedException e) { + return DOMNotificationPublishService.REJECTED; } - }; + } + + @Override + public Registration registerDemandListener(final DemandListener listener) { + final var initialTypes = listeners.keySet(); + executor.execute(() -> listener.onDemandUpdated(initialTypes)); + return demandListeners.register(listener); + } + } + + private final class SubscribeFacade implements DOMNotificationService { + @Override + public Registration registerNotificationListener(final DOMNotificationListener listener, + final Collection types) { + synchronized (DOMNotificationRouter.this) { + final var reg = new SingleReg(listener); + + if (!types.isEmpty()) { + final var b = ImmutableMultimap.builder(); + b.putAll(listeners); + + for (var t : types) { + b.put(t, reg); + } - if (!types.isEmpty()) { - final Builder> b = ImmutableMultimap.builder(); - b.putAll(listeners); + replaceListeners(b.build()); + } - for (final SchemaPath t : types) { - b.put(t, reg); + return reg; } + } - replaceListeners(b.build()); + @Override + public synchronized Registration registerNotificationListeners( + final Map typeToListener) { + synchronized (DOMNotificationRouter.this) { + final var b = ImmutableMultimap.builder(); + b.putAll(listeners); + + final var tmp = new HashMap(); + for (var e : typeToListener.entrySet()) { + b.put(e.getKey(), tmp.computeIfAbsent(e.getValue(), ComponentReg::new)); + } + replaceListeners(b.build()); + + final var regs = List.copyOf(tmp.values()); + return new AbstractRegistration() { + @Override + protected void removeRegistration() { + regs.forEach(ComponentReg::close); + removeRegistrations(regs); + } + }; + } } + } - return reg; + private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class); + private static final @NonNull ListenableFuture NO_LISTENERS = Futures.immediateFuture(Empty.value()); + + private final EqualityQueuedNotificationManager queueNotificationManager; + private final @NonNull DOMNotificationPublishService notificationPublishService = new PublishFacade(); + private final @NonNull DOMNotificationService notificationService = new SubscribeFacade(); + private final ObjectRegistry demandListeners = + ObjectRegistry.createConcurrent("notification demand listeners"); + private final ScheduledThreadPoolExecutor observer; + private final ExecutorService executor; + + private volatile ImmutableMultimap listeners = ImmutableMultimap.of(); + + @Inject + public DOMNotificationRouter(final int maxQueueCapacity) { + observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("DOMNotificationRouter-observer-%d") + .build()); + executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("DOMNotificationRouter-listeners-%d") + .build()); + queueNotificationManager = new EqualityQueuedNotificationManager<>("DOMNotificationRouter", executor, + maxQueueCapacity, DOMNotificationRouter::deliverEvents); + LOG.info("DOM Notification Router started"); } - @Override - public ListenerRegistration registerNotificationListener(final T listener, final SchemaPath... types) { - return registerNotificationListener(listener, Arrays.asList(types)); + @Activate + public DOMNotificationRouter(final Config config) { + this(config.queueDepth()); + } + + public @NonNull DOMNotificationService notificationService() { + return notificationService; + } + + public @NonNull DOMNotificationPublishService notificationPublishService() { + return notificationPublishService; + } + + private synchronized void removeRegistration(final SingleReg reg) { + replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> input != reg))); + } + + private synchronized void removeRegistrations(final List regs) { + replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> !regs.contains(input)))); } /** - * Swaps registered listeners and triggers notification update + * Swaps registered listeners and triggers notification update. * - * @param newListeners + * @param newListeners is used to notify listenerTypes changed */ - private void replaceListeners( - final Multimap> newListeners) { + private void replaceListeners(final ImmutableMultimap newListeners) { listeners = newListeners; notifyListenerTypesChanged(newListeners.keySet()); } - private void notifyListenerTypesChanged(final Set typesAfter) { - final List> listenersAfter =ImmutableList.copyOf(subscriptionListeners.getListeners()); - executor.submit(new Runnable() { - - @Override - public void run() { - for (final ListenerRegistration subListener : listenersAfter) { - try { - subListener.getInstance().onSubscriptionChanged(typesAfter); - } catch (final Exception e) { - LOG.warn("Uncaught exception during invoking listener {}", subListener.getInstance(), e); - } + @SuppressWarnings("checkstyle:IllegalCatch") + private void notifyListenerTypesChanged(final @NonNull ImmutableSet typesAfter) { + final var listenersAfter = demandListeners.streamObjects().collect(ImmutableList.toImmutableList()); + executor.execute(() -> { + for (var listener : listenersAfter) { + try { + listener.onDemandUpdated(typesAfter); + } catch (final Exception e) { + LOG.warn("Uncaught exception during invoking listener {}", listener, e); } } }); } - @Override - public ListenerRegistration registerSubscriptionListener( - final L listener) { - final Set initialTypes = listeners.keySet(); - executor.submit(new Runnable() { - - @Override - public void run() { - listener.onSubscriptionChanged(initialTypes); - } - }); - return subscriptionListeners.registerWithType(listener); + @VisibleForTesting + @NonNull ListenableFuture putNotificationImpl(final DOMNotification notification) + throws InterruptedException { + final var subscribers = listeners.get(notification.getType()); + return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers); } - private ListenableFuture publish(final long seq, final DOMNotification notification, final Collection> subscribers) { - final DOMNotificationRouterEvent event = disruptor.get(seq); - final ListenableFuture future = event.initialize(notification, subscribers); - disruptor.getRingBuffer().publish(seq); - return future; + @VisibleForTesting + @NonNull ListenableFuture publish(final DOMNotification notification, final Collection subscribers) { + final var futures = new ArrayList>(subscribers.size()); + subscribers.forEach(subscriber -> { + final var event = new DOMNotificationRouterEvent(notification); + futures.add(event.future()); + queueNotificationManager.submitNotification(subscriber, event); + }); + return Futures.transform(Futures.successfulAsList(futures), ignored -> Empty.value(), + MoreExecutors.directExecutor()); } + @PreDestroy + @Deactivate @Override - public ListenableFuture putNotification(final DOMNotification notification) throws InterruptedException { - final Collection> subscribers = listeners.get(notification.getType()); - if (subscribers.isEmpty()) { - return NO_LISTENERS; - } - - final long seq = disruptor.getRingBuffer().next(); - return publish(seq, notification, subscribers); + public void close() { + observer.shutdown(); + executor.shutdown(); + LOG.info("DOM Notification Router stopped"); } - private ListenableFuture tryPublish(final DOMNotification notification, final Collection> subscribers) { - final long seq; - try { - seq = disruptor.getRingBuffer().tryNext(); - } catch (final InsufficientCapacityException e) { - return DOMNotificationPublishService.REJECTED; - } - - return publish(seq, notification, subscribers); + @VisibleForTesting + ExecutorService executor() { + return executor; } - @Override - public ListenableFuture offerNotification(final DOMNotification notification) { - final Collection> subscribers = listeners.get(notification.getType()); - if (subscribers.isEmpty()) { - return NO_LISTENERS; - } - - return tryPublish(notification, subscribers); + @VisibleForTesting + ExecutorService observer() { + return observer; } - @Override - public ListenableFuture offerNotification(final DOMNotification notification, final long timeout, - final TimeUnit unit) throws InterruptedException { - final Collection> subscribers = listeners.get(notification.getType()); - if (subscribers.isEmpty()) { - return NO_LISTENERS; - } - - // Attempt to perform a non-blocking publish first - final ListenableFuture noBlock = tryPublish(notification, subscribers); - if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) { - return noBlock; - } + @VisibleForTesting + ImmutableMultimap listeners() { + return listeners; + } - /* - * FIXME: we need a background thread, which will watch out for blocking too long. Here - * we will arm a tasklet for it and synchronize delivery of interrupt properly. - */ - throw new UnsupportedOperationException("Not implemented yet"); + @VisibleForTesting + ObjectRegistry demandListeners() { + return demandListeners; } - @Override - public void close() { - disruptor.shutdown(); - executor.shutdown(); + private static void deliverEvents(final Reg reg, final ImmutableList events) { + if (reg.notClosed()) { + final var listener = reg.listener; + for (var event : events) { + event.deliverTo(listener); + } + } else { + events.forEach(DOMNotificationRouterEvent::clear); + } } }