import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.opendaylight.mdsal.dom.api.DOMNotification;
import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
* routing of notifications from publishers to subscribers.
* #offerNotification(DOMNotification, long, TimeUnit)}
* is realized by arming a background wakeup interrupt.
*/
-public final class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
+public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
DOMNotificationListener>> listeners = ImmutableMultimap.of();
private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
ListenerRegistry.create();
+ private final ScheduledThreadPoolExecutor observer;
@SuppressWarnings("unchecked")
- private DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
+ @VisibleForTesting
+ DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
this.executor = Preconditions.checkNotNull(executor);
-
+ this.observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("DOMNotificationRouter-%d").build());
disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY,
queueDepth, executor, ProducerType.MULTI, strategy);
disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
+ @VisibleForTesting
+ ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
final long seq;
try {
if (subscribers.isEmpty()) {
return NO_LISTENERS;
}
-
// Attempt to perform a non-blocking publish first
- final ListenableFuture<? extends Object> noBlock = tryPublish(notification, subscribers);
+ final ListenableFuture<?> noBlock = tryPublish(notification, subscribers);
if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
return noBlock;
}
- /*
- * FIXME: we need a background thread, which will watch out for blocking too long. Here
- * we will arm a tasklet for it and synchronize delivery of interrupt properly.
- */
- throw new UnsupportedOperationException("Not implemented yet");
+ try {
+ final Thread publishThread = Thread.currentThread();
+ ScheduledFuture<?> timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
+ final ListenableFuture<?> withBlock = putNotification(notification);
+ timerTask.cancel(true);
+ if (observer.getQueue().size() > 50) {
+ observer.purge();
+ }
+ return withBlock;
+ } catch (InterruptedException e) {
+ return DOMNotificationPublishService.REJECTED;
+ }
}
@Override
public void close() {
+ observer.shutdown();
disruptor.shutdown();
executor.shutdown();
}
*/
package org.opendaylight.mdsal.dom.broker;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.lmax.disruptor.PhasedBackoffWaitStrategy;
+import com.lmax.disruptor.WaitStrategy;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
+
import org.junit.Test;
import org.opendaylight.mdsal.dom.api.DOMNotification;
import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
+import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.util.ListenerRegistry;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
public class DOMNotificationRouterTest extends TestUtils {
+ private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock(
+ 1L, 30L, TimeUnit.MILLISECONDS);
+
@Test
public void create() throws Exception {
assertNotNull(DOMNotificationRouter.create(1,1,1,TimeUnit.SECONDS));
public void complexTest() throws Exception {
final DOMNotificationSubscriptionListener domNotificationSubscriptionListener =
mock(DOMNotificationSubscriptionListener.class);
- final DOMNotificationListener domNotificationListener = new TestListener();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final DOMNotificationListener domNotificationListener = new TestListener(latch);
final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
Multimap<SchemaPath, ?> listeners = domNotificationRouter.listeners();
assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
}
+ @Test
+ public void testOfferNotificationWithBlocking() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final TestListener testListener = new TestListener(latch);
+ final DOMNotification domNotification = mock(DOMNotification.class);
+ doReturn("test").when(domNotification).toString();
+ doReturn(SchemaPath.ROOT).when(domNotification).getType();
+ doReturn(TEST_CHILD).when(domNotification).getBody();
+ final TestRouter testRouter = TestRouter.create(1);
+ assertNotNull(testRouter.registerNotificationListener(testListener, SchemaPath.ROOT));
+ assertNotNull(testRouter.registerNotificationListener(testListener, SchemaPath.SAME));
+
+ assertNotEquals(DOMNotificationPublishService.REJECTED,
+ testRouter.offerNotification(domNotification, 3, TimeUnit.SECONDS));
+ assertTrue("Listener was not notified", latch.await(5, TimeUnit.SECONDS));
+ assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
+
+ assertEquals(DOMNotificationPublishService.REJECTED,
+ testRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
+ assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
+
+ }
+
@Test
public void close() throws Exception {
final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
assertTrue(executor.isShutdown());
}
- private class TestListener implements DOMNotificationListener {
+ private static class TestListener implements DOMNotificationListener {
+ private final CountDownLatch latch;
+ private final List<DOMNotification> receivedNotifications = new ArrayList<>();
+
+ TestListener(final CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ public void onNotification(@Nonnull final DOMNotification notification) {
+ receivedNotifications.add(notification);
+ latch.countDown();
+ }
+
+ public List<DOMNotification> getReceivedNotifications() {
+ return receivedNotifications;
+ }
+ }
+
+ private static class TestRouter extends DOMNotificationRouter {
+
+ TestRouter(ExecutorService executor, int queueDepth, WaitStrategy strategy) {
+ super(executor, queueDepth, strategy);
+ }
+
+ protected ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
+ final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+ return DOMNotificationPublishService.REJECTED;
+ }
+
@Override
- public void onNotification(@Nonnull final DOMNotification notification) {}
+ public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
+ throws InterruptedException {
+ Thread.sleep(2000);
+ return super.putNotification(notification);
+ }
+
+ public static TestRouter create(int queueDepth) {
+ final ExecutorService executor = Executors.newCachedThreadPool();
+
+ return new TestRouter(executor, queueDepth, DEFAULT_STRATEGY);
+ }
}
-}
\ No newline at end of file
+}