X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=dom%2Fmdsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fmdsal%2Fdom%2Fbroker%2FDOMNotificationRouter.java;h=4b3190a1da014eaf5bee51433f2021707d9ff5d5;hb=5f8a373c07549a901b70595067dd11c161d0c4e4;hp=42987a518468e9b7d7e443fb211a7cbce4b2858a;hpb=3eca227fa1f7819ed5d4a74c810047171f0e981e;p=mdsal.git diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java index 42987a5184..4b3190a1da 100644 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java @@ -7,10 +7,12 @@ */ package org.opendaylight.mdsal.dom.broker; +import static java.util.Objects.requireNonNull; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; -import com.google.common.collect.Multimap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimaps; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -21,28 +23,26 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.PreDestroy; import javax.inject.Inject; +import javax.inject.Singleton; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.mdsal.dom.api.DOMNotification; import org.opendaylight.mdsal.dom.api.DOMNotificationListener; +import org.opendaylight.mdsal.dom.api.DOMNotificationPublishDemandExtension; +import org.opendaylight.mdsal.dom.api.DOMNotificationPublishDemandExtension.DemandListener; import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService; import org.opendaylight.mdsal.dom.api.DOMNotificationService; -import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener; -import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistry; -import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.concepts.AbstractRegistration; -import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.concepts.Registration; -import org.opendaylight.yangtools.util.ListenerRegistry; +import org.opendaylight.yangtools.util.ObjectRegistry; import org.opendaylight.yangtools.util.concurrent.EqualityQueuedNotificationManager; -import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager; +import org.opendaylight.yangtools.yang.common.Empty; import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; @@ -61,14 +61,11 @@ import org.slf4j.LoggerFactory; * Internal implementation one by using a {@link QueuedNotificationManager}. *

*/ -@Component(immediate = true, configurationPid = "org.opendaylight.mdsal.dom.notification", service = { - DOMNotificationService.class, DOMNotificationPublishService.class, - DOMNotificationSubscriptionListenerRegistry.class -}) +@Singleton +@Component(configurationPid = "org.opendaylight.mdsal.dom.notification", service = DOMNotificationRouter.class) @Designate(ocd = DOMNotificationRouter.Config.class) // Non-final for testing -public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService, - DOMNotificationService, DOMNotificationSubscriptionListenerRegistry { +public class DOMNotificationRouter implements AutoCloseable { @ObjectClassDefinition() public @interface Config { @AttributeDefinition(name = "notification-queue-depth") @@ -76,14 +73,16 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl } @VisibleForTesting - abstract static sealed class Reg extends AbstractListenerRegistration { - Reg(final @NonNull T listener) { - super(listener); + abstract static sealed class Reg extends AbstractRegistration { + private final @NonNull DOMNotificationListener listener; + + Reg(final @NonNull DOMNotificationListener listener) { + this.listener = requireNonNull(listener); } } - private final class SingleReg extends Reg { - SingleReg(final @NonNull T listener) { + private final class SingleReg extends Reg { + SingleReg(final @NonNull DOMNotificationListener listener) { super(listener); } @@ -93,7 +92,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl } } - private static final class ComponentReg extends Reg { + private static final class ComponentReg extends Reg { ComponentReg(final @NonNull DOMNotificationListener listener) { super(listener); } @@ -104,17 +103,118 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl } } + private final class PublishFacade implements DOMNotificationPublishService, DOMNotificationPublishDemandExtension { + @Override + public List supportedExtensions() { + return List.of(this); + } + + @Override + public ListenableFuture putNotification(final DOMNotification notification) + throws InterruptedException { + return putNotificationImpl(notification); + } + + @Override + public ListenableFuture offerNotification(final DOMNotification notification) { + final var subscribers = listeners.get(notification.getType()); + return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers); + } + + @Override + public ListenableFuture offerNotification(final DOMNotification notification, + final long timeout, final TimeUnit unit) throws InterruptedException { + final var subscribers = listeners.get(notification.getType()); + if (subscribers.isEmpty()) { + return NO_LISTENERS; + } + // Attempt to perform a non-blocking publish first + final var noBlock = publish(notification, subscribers); + if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) { + return noBlock; + } + + try { + final var publishThread = Thread.currentThread(); + final var timerTask = observer.schedule(publishThread::interrupt, timeout, unit); + final var withBlock = putNotificationImpl(notification); + timerTask.cancel(true); + if (observer.getQueue().size() > 50) { + observer.purge(); + } + return withBlock; + } catch (InterruptedException e) { + return DOMNotificationPublishService.REJECTED; + } + } + + @Override + public Registration registerDemandListener(final DemandListener listener) { + final var initialTypes = listeners.keySet(); + executor.execute(() -> listener.onDemandUpdated(initialTypes)); + return demandListeners.register(listener); + } + } + + private final class SubscribeFacade implements DOMNotificationService { + @Override + public Registration registerNotificationListener(final DOMNotificationListener listener, + final Collection types) { + synchronized (DOMNotificationRouter.this) { + final var reg = new SingleReg(listener); + + if (!types.isEmpty()) { + final var b = ImmutableMultimap.builder(); + b.putAll(listeners); + + for (var t : types) { + b.put(t, reg); + } + + replaceListeners(b.build()); + } + + return reg; + } + } + + @Override + public synchronized Registration registerNotificationListeners( + final Map typeToListener) { + synchronized (DOMNotificationRouter.this) { + final var b = ImmutableMultimap.builder(); + b.putAll(listeners); + + final var tmp = new HashMap(); + for (var e : typeToListener.entrySet()) { + b.put(e.getKey(), tmp.computeIfAbsent(e.getValue(), ComponentReg::new)); + } + replaceListeners(b.build()); + + final var regs = List.copyOf(tmp.values()); + return new AbstractRegistration() { + @Override + protected void removeRegistration() { + regs.forEach(ComponentReg::close); + removeRegistrations(regs); + } + }; + } + } + } + private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class); - private static final @NonNull ListenableFuture NO_LISTENERS = FluentFutures.immediateNullFluentFuture(); + private static final @NonNull ListenableFuture NO_LISTENERS = Futures.immediateFuture(Empty.value()); - private final ListenerRegistry subscriptionListeners = - ListenerRegistry.create(); - private final EqualityQueuedNotificationManager, - DOMNotificationRouterEvent> queueNotificationManager; + private final EqualityQueuedNotificationManager queueNotificationManager; + private final @NonNull DOMNotificationPublishService notificationPublishService = new PublishFacade(); + private final @NonNull DOMNotificationService notificationService = new SubscribeFacade(); + private final ObjectRegistry demandListeners = + ObjectRegistry.createConcurrent("notification demand listeners"); private final ScheduledThreadPoolExecutor observer; private final ExecutorService executor; - private volatile Multimap> listeners = ImmutableMultimap.of(); + private volatile ImmutableMultimap listeners = ImmutableMultimap.of(); @Inject public DOMNotificationRouter(final int maxQueueCapacity) { @@ -136,48 +236,15 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl this(config.queueDepth()); } - @Override - public synchronized ListenerRegistration registerNotificationListener( - final T listener, final Collection types) { - final var reg = new SingleReg<>(listener); - - if (!types.isEmpty()) { - final var b = ImmutableMultimap.>builder(); - b.putAll(listeners); - - for (var t : types) { - b.put(t, reg); - } - - replaceListeners(b.build()); - } - - return reg; + public @NonNull DOMNotificationService notificationService() { + return notificationService; } - @Override - public synchronized Registration registerNotificationListeners( - final Map typeToListener) { - final var b = ImmutableMultimap.>builder(); - b.putAll(listeners); - - final var tmp = new HashMap(); - for (var e : typeToListener.entrySet()) { - b.put(e.getKey(), tmp.computeIfAbsent(e.getValue(), ComponentReg::new)); - } - replaceListeners(b.build()); - - final var regs = List.copyOf(tmp.values()); - return new AbstractRegistration() { - @Override - protected void removeRegistration() { - regs.forEach(ComponentReg::close); - removeRegistrations(regs); - } - }; + public @NonNull DOMNotificationPublishService notificationPublishService() { + return notificationPublishService; } - private synchronized void removeRegistration(final SingleReg reg) { + private synchronized void removeRegistration(final SingleReg reg) { replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> input != reg))); } @@ -190,86 +257,44 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl * * @param newListeners is used to notify listenerTypes changed */ - private void replaceListeners(final Multimap> newListeners) { + private void replaceListeners(final ImmutableMultimap newListeners) { listeners = newListeners; notifyListenerTypesChanged(newListeners.keySet()); } @SuppressWarnings("checkstyle:IllegalCatch") - private void notifyListenerTypesChanged(final Set typesAfter) { - final var listenersAfter = subscriptionListeners.streamListeners().collect(ImmutableList.toImmutableList()); + private void notifyListenerTypesChanged(final @NonNull ImmutableSet typesAfter) { + final var listenersAfter = demandListeners.streamObjects().collect(ImmutableList.toImmutableList()); executor.execute(() -> { - for (var subListener : listenersAfter) { + for (var listener : listenersAfter) { try { - subListener.onSubscriptionChanged(typesAfter); + listener.onDemandUpdated(typesAfter); } catch (final Exception e) { - LOG.warn("Uncaught exception during invoking listener {}", subListener, e); + LOG.warn("Uncaught exception during invoking listener {}", listener, e); } } }); } - @Override - public ListenerRegistration registerSubscriptionListener( - final L listener) { - final var initialTypes = listeners.keySet(); - executor.execute(() -> listener.onSubscriptionChanged(initialTypes)); - return subscriptionListeners.register(listener); + @VisibleForTesting + @NonNull ListenableFuture putNotificationImpl(final DOMNotification notification) + throws InterruptedException { + final var subscribers = listeners.get(notification.getType()); + return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers); } @VisibleForTesting - @NonNull ListenableFuture publish(final DOMNotification notification, - final Collection> subscribers) { - final var futures = new ArrayList>(subscribers.size()); + @NonNull ListenableFuture publish(final DOMNotification notification, final Collection subscribers) { + final var futures = new ArrayList>(subscribers.size()); subscribers.forEach(subscriber -> { final var event = new DOMNotificationRouterEvent(notification); futures.add(event.future()); queueNotificationManager.submitNotification(subscriber, event); }); - return Futures.transform(Futures.successfulAsList(futures), ignored -> (Void)null, + return Futures.transform(Futures.successfulAsList(futures), ignored -> Empty.value(), MoreExecutors.directExecutor()); } - @Override - public ListenableFuture putNotification(final DOMNotification notification) - throws InterruptedException { - final var subscribers = listeners.get(notification.getType()); - return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers); - } - - @Override - public ListenableFuture offerNotification(final DOMNotification notification) { - final var subscribers = listeners.get(notification.getType()); - return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers); - } - - @Override - public ListenableFuture offerNotification(final DOMNotification notification, final long timeout, - final TimeUnit unit) throws InterruptedException { - final var subscribers = listeners.get(notification.getType()); - if (subscribers.isEmpty()) { - return NO_LISTENERS; - } - // Attempt to perform a non-blocking publish first - final var noBlock = publish(notification, subscribers); - if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) { - return noBlock; - } - - try { - final var publishThread = Thread.currentThread(); - final var timerTask = observer.schedule(publishThread::interrupt, timeout, unit); - final var withBlock = putNotification(notification); - timerTask.cancel(true); - if (observer.getQueue().size() > 50) { - observer.purge(); - } - return withBlock; - } catch (InterruptedException e) { - return DOMNotificationPublishService.REJECTED; - } - } - @PreDestroy @Deactivate @Override @@ -290,19 +315,18 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl } @VisibleForTesting - Multimap listeners() { + ImmutableMultimap listeners() { return listeners; } @VisibleForTesting - ListenerRegistry subscriptionListeners() { - return subscriptionListeners; + ObjectRegistry demandListeners() { + return demandListeners; } - private static void deliverEvents(final AbstractListenerRegistration reg, - final ImmutableList events) { + private static void deliverEvents(final Reg reg, final ImmutableList events) { if (reg.notClosed()) { - final var listener = reg.getInstance(); + final var listener = reg.listener; for (var event : events) { event.deliverTo(listener); }