From: Robert Varga Date: Tue, 9 Dec 2014 14:09:59 +0000 (+0100) Subject: BUG-2288: implement DOMNotificationRouter X-Git-Tag: release/lithium~601^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=ae2dfd5020d88c24f1e2cad28c021c6a8f98a6fd BUG-2288: implement DOMNotificationRouter Implements DOMNotification(Publish)Service using LMAX Disruptor. The disruptor is used internallly to transfer requests from publishers to two-stage notification dispatch, where the frist stage takes care of delivering events to subscribers and the second stage notifies the futures. Change-Id: I654d9d044e80b2a2ff6fd5b05ddceed4e79a4ebc Signed-off-by: Robert Varga --- diff --git a/features/mdsal/src/main/resources/features.xml b/features/mdsal/src/main/resources/features.xml index 1582f45789..8c166e6382 100644 --- a/features/mdsal/src/main/resources/features.xml +++ b/features/mdsal/src/main/resources/features.xml @@ -28,6 +28,7 @@ odl-mdsal-common odl-config-startup odl-config-netty + mvn:com.lmax/disruptor/${lmax.version} mvn:org.opendaylight.controller/sal-core-api/${project.version} mvn:org.opendaylight.controller/sal-core-spi/${project.version} mvn:org.opendaylight.controller/sal-broker-impl/${project.version} diff --git a/opendaylight/commons/opendaylight/pom.xml b/opendaylight/commons/opendaylight/pom.xml index 5310db30a7..d6140f1380 100644 --- a/opendaylight/commons/opendaylight/pom.xml +++ b/opendaylight/commons/opendaylight/pom.xml @@ -208,6 +208,7 @@ 0.7.0-SNAPSHOT 0.12.0 0.9.7 + 3.3.0 @@ -317,6 +318,12 @@ guava ${guava.version} + + com.lmax + disruptor + ${lmax.version} + + com.jcabi diff --git a/opendaylight/md-sal/sal-binding-it/src/main/java/org/opendaylight/controller/test/sal/binding/it/TestHelper.java b/opendaylight/md-sal/sal-binding-it/src/main/java/org/opendaylight/controller/test/sal/binding/it/TestHelper.java index 9b6d5836f0..a64e3600f5 100644 --- a/opendaylight/md-sal/sal-binding-it/src/main/java/org/opendaylight/controller/test/sal/binding/it/TestHelper.java +++ b/opendaylight/md-sal/sal-binding-it/src/main/java/org/opendaylight/controller/test/sal/binding/it/TestHelper.java @@ -10,8 +10,8 @@ package org.opendaylight.controller.test.sal.binding.it; import static org.ops4j.pax.exam.CoreOptions.frameworkProperty; import static org.ops4j.pax.exam.CoreOptions.junitBundles; import static org.ops4j.pax.exam.CoreOptions.mavenBundle; +import static org.ops4j.pax.exam.CoreOptions.systemPackages; import static org.ops4j.pax.exam.CoreOptions.systemProperty; - import org.ops4j.pax.exam.Option; import org.ops4j.pax.exam.options.DefaultCompositeOption; import org.ops4j.pax.exam.util.PathUtils; @@ -47,7 +47,7 @@ public class TestHelper { bindingAwareSalBundles(), mavenBundle("commons-codec", "commons-codec").versionAsInProject(), - systemProperty("org.osgi.framework.system.packages.extra").value("sun.nio.ch"), + systemPackages("sun.nio.ch", "sun.misc"), mavenBundle("io.netty", "netty-common").versionAsInProject(), // mavenBundle("io.netty", "netty-buffer").versionAsInProject(), // mavenBundle("io.netty", "netty-handler").versionAsInProject(), // @@ -123,7 +123,8 @@ public class TestHelper { mavenBundle(CONTROLLER, "sal-common-util").versionAsInProject(), // // - mavenBundle(CONTROLLER, "sal-inmemory-datastore").versionAsInProject(), // / + mavenBundle("com.lmax", "disruptor").versionAsInProject(), + mavenBundle(CONTROLLER, "sal-inmemory-datastore").versionAsInProject(), // mavenBundle(CONTROLLER, "sal-broker-impl").versionAsInProject(), // // mavenBundle(CONTROLLER, "sal-core-spi").versionAsInProject().update(), // diff --git a/opendaylight/md-sal/sal-dom-broker/pom.xml b/opendaylight/md-sal/sal-dom-broker/pom.xml index a824792cf8..477ddeabdf 100644 --- a/opendaylight/md-sal/sal-dom-broker/pom.xml +++ b/opendaylight/md-sal/sal-dom-broker/pom.xml @@ -15,8 +15,8 @@ guava - junit - junit + com.lmax + disruptor org.opendaylight.controller @@ -60,6 +60,10 @@ ietf-yang-types + + junit + junit + org.slf4j slf4j-api diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMNotificationRouter.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMNotificationRouter.java new file mode 100644 index 0000000000..aac425b3d4 --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMNotificationRouter.java @@ -0,0 +1,190 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.ImmutableMultimap.Builder; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.InsufficientCapacityException; +import com.lmax.disruptor.SleepingWaitStrategy; +import com.lmax.disruptor.WaitStrategy; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.md.sal.dom.api.DOMNotification; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; +import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; + +/** + * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides + * routing of notifications from publishers to subscribers. + * + * Internal implementation works by allocating a two-handler Disruptor. The first handler delivers notifications + * to subscribed listeners and the second one notifies whoever may be listening on the returned future. Registration + * state tracking is performed by a simple immutable multimap -- when a registration or unregistration occurs we + * re-generate the entire map from scratch and set it atomically. While registrations/unregistrations synchronize + * on this instance, notifications do not take any locks here. + * + * The fully-blocking {@link #publish(long, DOMNotification, Collection)} and non-blocking {@link #offerNotification(DOMNotification)} + * are realized using the Disruptor's native operations. The bounded-blocking {@link #offerNotification(DOMNotification, long, TimeUnit)} + * is realized by arming a background wakeup interrupt. + */ +public final class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService, DOMNotificationService { + private static final ListenableFuture NO_LISTENERS = Futures.immediateFuture(null); + private static final WaitStrategy DEFAULT_STRATEGY = new SleepingWaitStrategy(); + private static final EventHandler DISPATCH_NOTIFICATIONS = new EventHandler() { + @Override + public void onEvent(final DOMNotificationRouterEvent event, final long sequence, final boolean endOfBatch) throws Exception { + event.deliverNotification(); + + } + }; + private static final EventHandler NOTIFY_FUTURE = new EventHandler() { + @Override + public void onEvent(final DOMNotificationRouterEvent event, final long sequence, final boolean endOfBatch) { + event.setFuture(); + } + }; + + private final Disruptor disruptor; + private final ExecutorService executor; + private volatile Multimap> listeners = ImmutableMultimap.of(); + + private DOMNotificationRouter(final ExecutorService executor, final Disruptor disruptor) { + this.executor = Preconditions.checkNotNull(executor); + this.disruptor = Preconditions.checkNotNull(disruptor); + } + + @SuppressWarnings("unchecked") + public static DOMNotificationRouter create(final int queueDepth) { + final ExecutorService executor = Executors.newCachedThreadPool(); + final Disruptor disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, executor, ProducerType.MULTI, DEFAULT_STRATEGY); + + disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE); + disruptor.start(); + + return new DOMNotificationRouter(executor, disruptor); + } + + @Override + public synchronized ListenerRegistration registerNotificationListener(final T listener, final Collection types) { + final ListenerRegistration reg = new AbstractListenerRegistration(listener) { + @Override + protected void removeRegistration() { + final ListenerRegistration me = this; + + synchronized (DOMNotificationRouter.this) { + listeners = ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, new Predicate>() { + @Override + public boolean apply(final ListenerRegistration input) { + return input != me; + } + })); + } + } + }; + + if (!types.isEmpty()) { + final Builder> b = ImmutableMultimap.builder(); + b.putAll(listeners); + + for (SchemaPath t : types) { + b.put(t, reg); + } + + listeners = b.build(); + } + + return reg; + } + + @Override + public ListenerRegistration registerNotificationListener(final T listener, final SchemaPath... types) { + return registerNotificationListener(listener, Arrays.asList(types)); + } + + private ListenableFuture publish(final long seq, final DOMNotification notification, final Collection> subscribers) { + final DOMNotificationRouterEvent event = disruptor.get(seq); + final ListenableFuture future = event.initialize(notification, subscribers); + disruptor.getRingBuffer().publish(seq); + return future; + } + + @Override + public ListenableFuture putNotification(final DOMNotification notification) throws InterruptedException { + final Collection> subscribers = listeners.get(notification.getType()); + if (subscribers.isEmpty()) { + return NO_LISTENERS; + } + + final long seq = disruptor.getRingBuffer().next(); + return publish(seq, notification, subscribers); + } + + private ListenableFuture tryPublish(final DOMNotification notification, final Collection> subscribers) { + final long seq; + try { + seq = disruptor.getRingBuffer().tryNext(); + } catch (InsufficientCapacityException e) { + return DOMNotificationPublishService.REJECTED; + } + + return publish(seq, notification, subscribers); + } + + @Override + public ListenableFuture offerNotification(final DOMNotification notification) { + final Collection> subscribers = listeners.get(notification.getType()); + if (subscribers.isEmpty()) { + return NO_LISTENERS; + } + + return tryPublish(notification, subscribers); + } + + @Override + public ListenableFuture offerNotification(final DOMNotification notification, final long timeout, + final TimeUnit unit) throws InterruptedException { + final Collection> subscribers = listeners.get(notification.getType()); + if (subscribers.isEmpty()) { + return NO_LISTENERS; + } + + // Attempt to perform a non-blocking publish first + final ListenableFuture noBlock = tryPublish(notification, subscribers); + if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) { + return noBlock; + } + + /* + * FIXME: we need a background thread, which will watch out for blocking too long. Here + * we will arm a tasklet for it and synchronize delivery of interrupt properly. + */ + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void close() { + disruptor.shutdown(); + executor.shutdown(); + } +} diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMNotificationRouterEvent.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMNotificationRouterEvent.java new file mode 100644 index 0000000000..65c7166ac9 --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMNotificationRouterEvent.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.lmax.disruptor.EventFactory; +import java.util.Collection; +import org.opendaylight.controller.md.sal.dom.api.DOMNotification; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; +import org.opendaylight.yangtools.concepts.ListenerRegistration; + +/** + * A single notification event in the disruptor ringbuffer. These objects are reused, + * so they do have mutable state. + */ +final class DOMNotificationRouterEvent { + public static final EventFactory FACTORY = new EventFactory() { + @Override + public DOMNotificationRouterEvent newInstance() { + return new DOMNotificationRouterEvent(); + } + }; + + private Collection> subscribers; + private DOMNotification notification; + private SettableFuture future; + + private DOMNotificationRouterEvent() { + // Hidden on purpose, initialized in initialize() + } + + ListenableFuture initialize(final DOMNotification notification, final Collection> subscribers) { + this.notification = Preconditions.checkNotNull(notification); + this.subscribers = Preconditions.checkNotNull(subscribers); + this.future = SettableFuture.create(); + return this.future; + } + + void deliverNotification() { + for (ListenerRegistration r : subscribers) { + final DOMNotificationListener l = r.getInstance(); + if (l != null) { + l.onNotification(notification); + } + } + } + + void setFuture() { + future.set(null); + } + +} \ No newline at end of file