From 8ef9ac5c2b42d32c8cea565f3494a3aa0e6e109b Mon Sep 17 00:00:00 2001 From: Tadei Bilan Date: Fri, 24 Apr 2020 12:03:34 +0200 Subject: [PATCH] Replace lmax disruptor with QueuedNotificationManager LMAX tends to eat CPU when being idle and does not deliver things in parallel. QueuedNotificationManager seems to fare better in this regard. JIRA: MDSAL-546 Change-Id: I6f0e100110bd0888e55b4a21127306293ad97202 Signed-off-by: Tomas Cere Signed-off-by: tadei.bilan Signed-off-by: Robert Varga --- dom/mdsal-dom-broker/pom.xml | 4 - .../dom/broker/DOMNotificationRouter.java | 107 +++++++----------- .../broker/DOMNotificationRouterEvent.java | 45 +++----- .../dom/broker/OSGiDOMNotificationRouter.java | 7 +- .../dom/broker/DOMNotificationRouterTest.java | 36 +++--- features/odl-mdsal-dom-broker/pom.xml | 6 - .../src/main/feature/feature.xml | 6 - 7 files changed, 78 insertions(+), 133 deletions(-) delete mode 100644 features/odl-mdsal-dom-broker/src/main/feature/feature.xml diff --git a/dom/mdsal-dom-broker/pom.xml b/dom/mdsal-dom-broker/pom.xml index 441b4319e2..97d83b88a3 100644 --- a/dom/mdsal-dom-broker/pom.xml +++ b/dom/mdsal-dom-broker/pom.xml @@ -24,10 +24,6 @@ com.google.guava guava - - com.lmax - disruptor - org.opendaylight.mdsal diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java index 6d524a1ab4..ef4db233e4 100644 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java @@ -7,22 +7,17 @@ */ package org.opendaylight.mdsal.dom.broker; -import static com.google.common.base.Preconditions.checkArgument; - import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableMultimap.Builder; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.InsufficientCapacityException; -import com.lmax.disruptor.PhasedBackoffWaitStrategy; -import com.lmax.disruptor.WaitStrategy; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; @@ -40,7 +35,9 @@ import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistr import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.util.ListenerRegistry; +import org.opendaylight.yangtools.util.concurrent.EqualityQueuedNotificationManager; import org.opendaylight.yangtools.util.concurrent.FluentFutures; +import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager; import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,34 +47,19 @@ import org.slf4j.LoggerFactory; * routing of notifications from publishers to subscribers. * *

- * Internal implementation works by allocating a two-handler Disruptor. The first handler delivers notifications - * to subscribed listeners and the second one notifies whoever may be listening on the returned future. Registration - * state tracking is performed by a simple immutable multimap -- when a registration or unregistration occurs we - * re-generate the entire map from scratch and set it atomically. While registrations/unregistrations synchronize - * on this instance, notifications do not take any locks here. - * - *

- * The fully-blocking {@link #publish(long, DOMNotification, Collection)} - * and non-blocking {@link #offerNotification(DOMNotification)} - * are realized using the Disruptor's native operations. The bounded-blocking {@link - * #offerNotification(DOMNotification, long, TimeUnit)} - * is realized by arming a background wakeup interrupt. + * Internal implementation one by using a {@link QueuedNotificationManager}. + *

*/ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService, DOMNotificationService, DOMNotificationSubscriptionListenerRegistry { private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class); private static final ListenableFuture NO_LISTENERS = FluentFutures.immediateNullFluentFuture(); - private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock( - 1L, 30L, TimeUnit.MILLISECONDS); - private static final EventHandler DISPATCH_NOTIFICATIONS = - (event, sequence, endOfBatch) -> event.deliverNotification(); - private static final EventHandler NOTIFY_FUTURE = - (event, sequence, endOfBatch) -> event.setFuture(); private final ListenerRegistry subscriptionListeners = ListenerRegistry.create(); - private final Disruptor disruptor; + private final EqualityQueuedNotificationManager, + DOMNotificationRouterEvent> queueNotificationManager; private final ScheduledThreadPoolExecutor observer; private final ExecutorService executor; @@ -85,28 +67,17 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl ImmutableMultimap.of(); @VisibleForTesting - DOMNotificationRouter(final int queueDepth, final WaitStrategy strategy) { + DOMNotificationRouter(int maxQueueCapacity) { observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DOMNotificationRouter-observer-%d").build()); executor = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DOMNotificationRouter-listeners-%d").build()); - disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, - new ThreadFactoryBuilder().setNameFormat("DOMNotificationRouter-disruptor-%d").build(), - ProducerType.MULTI, strategy); - disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS); - disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE); - disruptor.start(); + queueNotificationManager = new EqualityQueuedNotificationManager<>("DOMNotificationRouter", executor, + maxQueueCapacity, DOMNotificationRouter::deliverEvents); } - public static DOMNotificationRouter create(final int queueDepth) { - return new DOMNotificationRouter(queueDepth, DEFAULT_STRATEGY); - } - - public static DOMNotificationRouter create(final int queueDepth, final long spinTime, final long parkTime, - final TimeUnit unit) { - checkArgument(Long.lowestOneBit(queueDepth) == Long.highestOneBit(queueDepth), - "Queue depth %s is not power-of-two", queueDepth); - return new DOMNotificationRouter(queueDepth, PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit)); + public static DOMNotificationRouter create(int maxQueueCapacity) { + return new DOMNotificationRouter(maxQueueCapacity); } @Override @@ -171,12 +142,18 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl return subscriptionListeners.register(listener); } - private ListenableFuture publish(final long seq, final DOMNotification notification, + + @VisibleForTesting + ListenableFuture publish(DOMNotification notification, final Collection> subscribers) { - final DOMNotificationRouterEvent event = disruptor.get(seq); - final ListenableFuture future = event.initialize(notification, subscribers); - disruptor.getRingBuffer().publish(seq); - return future; + final List> futures = new ArrayList<>(subscribers.size()); + subscribers.forEach(subscriber -> { + final DOMNotificationRouterEvent event = new DOMNotificationRouterEvent(notification); + futures.add(event.future()); + queueNotificationManager.submitNotification(subscriber, event); + }); + return Futures.transform(Futures.successfulAsList(futures), ignored -> (Void)null, + MoreExecutors.directExecutor()); } @Override @@ -188,22 +165,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl return NO_LISTENERS; } - final long seq = disruptor.getRingBuffer().next(); - return publish(seq, notification, subscribers); - } - - @SuppressWarnings("checkstyle:IllegalCatch") - @VisibleForTesting - ListenableFuture tryPublish(final DOMNotification notification, - final Collection> subscribers) { - final long seq; - try { - seq = disruptor.getRingBuffer().tryNext(); - } catch (final InsufficientCapacityException e) { - return DOMNotificationPublishService.REJECTED; - } - - return publish(seq, notification, subscribers); + return publish(notification, subscribers); } @Override @@ -214,7 +176,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl return NO_LISTENERS; } - return tryPublish(notification, subscribers); + return publish(notification, subscribers); } @Override @@ -226,7 +188,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl return NO_LISTENERS; } // Attempt to perform a non-blocking publish first - final ListenableFuture noBlock = tryPublish(notification, subscribers); + final ListenableFuture noBlock = publish(notification, subscribers); if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) { return noBlock; } @@ -247,7 +209,6 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl @Override public void close() { - disruptor.shutdown(); observer.shutdown(); executor.shutdown(); } @@ -271,4 +232,16 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl ListenerRegistry subscriptionListeners() { return subscriptionListeners; } + + private static void deliverEvents(final AbstractListenerRegistration reg, + final ImmutableList events) { + if (reg.notClosed()) { + final DOMNotificationListener listener = reg.getInstance(); + for (DOMNotificationRouterEvent event : events) { + event.deliverTo(listener); + } + } else { + events.forEach(DOMNotificationRouterEvent::clear); + } + } } diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouterEvent.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouterEvent.java index 9dd4f6a3ec..4c870ba173 100644 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouterEvent.java +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouterEvent.java @@ -11,54 +11,41 @@ import static java.util.Objects.requireNonNull; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import com.lmax.disruptor.EventFactory; -import java.util.Collection; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.mdsal.dom.api.DOMNotification; import org.opendaylight.mdsal.dom.api.DOMNotificationListener; -import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A single notification event in the disruptor ringbuffer. These objects are reused, so they do have mutable state. + * A single notification event in the notification router. */ final class DOMNotificationRouterEvent { private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouterEvent.class); - static final EventFactory FACTORY = DOMNotificationRouterEvent::new; + private final SettableFuture future = SettableFuture.create(); + private final @NonNull DOMNotification notification; - private Collection> subscribers; - private DOMNotification notification; - private SettableFuture future; - - private DOMNotificationRouterEvent() { - // Hidden on purpose, initialized in initialize() + DOMNotificationRouterEvent(final DOMNotification notification) { + this.notification = requireNonNull(notification); } - @SuppressWarnings("checkstyle:hiddenField") - ListenableFuture initialize(final DOMNotification notification, - final Collection> subscribers) { - this.notification = requireNonNull(notification); - this.subscribers = requireNonNull(subscribers); - this.future = SettableFuture.create(); - return this.future; + ListenableFuture future() { + return future; } @SuppressWarnings("checkstyle:illegalCatch") - void deliverNotification() { - for (AbstractListenerRegistration reg : subscribers) { - if (reg.notClosed()) { - final DOMNotificationListener listener = reg.getInstance(); - try { - listener.onNotification(notification); - } catch (Exception e) { - LOG.warn("Listener {} failed during notification delivery", listener, e); - } - } + void deliverTo(DOMNotificationListener listener) { + try { + listener.onNotification(notification); + } catch (Exception e) { + LOG.warn("Listener {} failed during notification delivery", listener, e); + } finally { + clear(); } } - void setFuture() { + void clear() { future.set(null); } } diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/OSGiDOMNotificationRouter.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/OSGiDOMNotificationRouter.java index da60ca811d..3de602c988 100644 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/OSGiDOMNotificationRouter.java +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/OSGiDOMNotificationRouter.java @@ -38,10 +38,6 @@ public final class OSGiDOMNotificationRouter implements DOMNotificationService, public @interface Config { @AttributeDefinition(name = "notification-queue-depth") int queueDepth() default 65536; - @AttributeDefinition(name = "notification-queue-spin") - long spinTime() default 0; - @AttributeDefinition(name = "notification-queue-park") - long parkTime() default 0; } private static final Logger LOG = LoggerFactory.getLogger(OSGiDOMNotificationRouter.class); @@ -50,8 +46,7 @@ public final class OSGiDOMNotificationRouter implements DOMNotificationService, @Activate void activate(final Config config) { - router = DOMNotificationRouter.create(config.queueDepth(), config.spinTime(), config.parkTime(), - TimeUnit.MILLISECONDS); + router = DOMNotificationRouter.create(config.queueDepth()); LOG.info("DOM Notification Router started"); } diff --git a/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouterTest.java b/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouterTest.java index 2781a72e7f..c071c5dd1d 100644 --- a/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouterTest.java +++ b/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouterTest.java @@ -13,13 +13,13 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.ListenableFuture; -import com.lmax.disruptor.PhasedBackoffWaitStrategy; -import com.lmax.disruptor.WaitStrategy; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -38,13 +38,9 @@ import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absol public class DOMNotificationRouterTest extends TestUtils { - private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock( - 1L, 30L, TimeUnit.MILLISECONDS); - @Test public void create() throws Exception { - assertNotNull(DOMNotificationRouter.create(1,1,1,TimeUnit.SECONDS)); - assertNotNull(DOMNotificationRouter.create(1)); + assertNotNull(DOMNotificationRouter.create(1024)); } @SuppressWarnings("checkstyle:IllegalCatch") @@ -52,9 +48,11 @@ public class DOMNotificationRouterTest extends TestUtils { public void complexTest() throws Exception { final DOMNotificationSubscriptionListener domNotificationSubscriptionListener = mock(DOMNotificationSubscriptionListener.class); + doNothing().when(domNotificationSubscriptionListener).onSubscriptionChanged(any()); + final CountDownLatch latch = new CountDownLatch(1); final DOMNotificationListener domNotificationListener = new TestListener(latch); - final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1); + final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1024); Multimap listeners = domNotificationRouter.listeners(); @@ -98,7 +96,7 @@ public class DOMNotificationRouterTest extends TestUtils { @Test public void offerNotification() throws Exception { - final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1); + final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1024); final DOMNotification domNotification = mock(DOMNotification.class); doReturn(Absolute.of(TestModel.TEST_QNAME)).when(domNotification).getType(); doReturn(TEST_CHILD).when(domNotification).getBody(); @@ -126,14 +124,14 @@ public class DOMNotificationRouterTest extends TestUtils { assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size()); assertEquals(DOMNotificationPublishService.REJECTED, - testRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS)); + testRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS)); assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size()); } } @Test public void close() throws Exception { - final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1); + final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1024); final ExecutorService executor = domNotificationRouter.executor(); final ExecutorService observer = domNotificationRouter.observer(); @@ -164,14 +162,22 @@ public class DOMNotificationRouterTest extends TestUtils { } private static class TestRouter extends DOMNotificationRouter { + + private boolean triggerRejected = false; + TestRouter(final int queueDepth) { - super(queueDepth, DEFAULT_STRATEGY); + super(queueDepth); } @Override - protected ListenableFuture tryPublish(final DOMNotification notification, - final Collection> subscribers) { - return DOMNotificationPublishService.REJECTED; + ListenableFuture publish(DOMNotification notification, + Collection> subscribers) { + if (triggerRejected) { + return REJECTED; + } + + triggerRejected = true; + return super.publish(notification, subscribers); } @Override diff --git a/features/odl-mdsal-dom-broker/pom.xml b/features/odl-mdsal-dom-broker/pom.xml index 0678095429..68c064f305 100644 --- a/features/odl-mdsal-dom-broker/pom.xml +++ b/features/odl-mdsal-dom-broker/pom.xml @@ -21,12 +21,6 @@ MD-SAL DOM implementation - - org.opendaylight.odlparent - odl-lmax-3 - xml - features - org.opendaylight.mdsal odl-mdsal-dom-runtime diff --git a/features/odl-mdsal-dom-broker/src/main/feature/feature.xml b/features/odl-mdsal-dom-broker/src/main/feature/feature.xml deleted file mode 100644 index 2924a366d2..0000000000 --- a/features/odl-mdsal-dom-broker/src/main/feature/feature.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - odl-lmax-3 - - -- 2.36.6