Bug 8531:Implementation for offerNotification method with timeout in DOMNotificationR... 08/59008/16
authorMerlinChan <chen.mingling@zte.com.cn>
Thu, 15 Jun 2017 09:35:13 +0000 (17:35 +0800)
committerTom Pantelis <tompantelis@gmail.com>
Mon, 3 Jul 2017 10:21:32 +0000 (10:21 +0000)
Signed-off-by: MerlinChan <chen.mingling@zte.com.cn>
Change-Id: Ia2b40907b222bc0b99e930af3b117e88d85c3958

dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java
dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouterTest.java

index 9775a512a31262e552bd6b70f31d4efa93a886d1..78ba3cc19b6b75ac1a7a40396f49d0b2654ec904 100644 (file)
@@ -16,6 +16,7 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.InsufficientCapacityException;
 import com.lmax.disruptor.PhasedBackoffWaitStrategy;
@@ -28,6 +29,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.mdsal.dom.api.DOMNotification;
 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
@@ -42,6 +45,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
  * routing of notifications from publishers to subscribers.
@@ -60,7 +64,7 @@ import org.slf4j.LoggerFactory;
  * #offerNotification(DOMNotification, long, TimeUnit)}
  * is realized by arming a background wakeup interrupt.
  */
-public final class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
+public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
         DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
 
     private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
@@ -78,11 +82,14 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati
             DOMNotificationListener>> listeners = ImmutableMultimap.of();
     private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
             ListenerRegistry.create();
+    private final ScheduledThreadPoolExecutor observer;
 
     @SuppressWarnings("unchecked")
-    private DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
+    @VisibleForTesting
+    DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
         this.executor = Preconditions.checkNotNull(executor);
-
+        this.observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
+                .setDaemon(true).setNameFormat("DOMNotificationRouter-%d").build());
         disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY,
                 queueDepth, executor, ProducerType.MULTI, strategy);
         disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
@@ -194,7 +201,8 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
+    @VisibleForTesting
+    ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
             final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
         final long seq;
         try {
@@ -225,22 +233,29 @@ public final class DOMNotificationRouter implements AutoCloseable, DOMNotificati
         if (subscribers.isEmpty()) {
             return NO_LISTENERS;
         }
-
         // Attempt to perform a non-blocking publish first
-        final ListenableFuture<? extends Object> noBlock = tryPublish(notification, subscribers);
+        final ListenableFuture<?> noBlock = tryPublish(notification, subscribers);
         if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
             return noBlock;
         }
 
-        /*
-         * FIXME: we need a background thread, which will watch out for blocking too long. Here
-         *        we will arm a tasklet for it and synchronize delivery of interrupt properly.
-         */
-        throw new UnsupportedOperationException("Not implemented yet");
+        try {
+            final Thread publishThread = Thread.currentThread();
+            ScheduledFuture<?> timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
+            final ListenableFuture<?> withBlock = putNotification(notification);
+            timerTask.cancel(true);
+            if (observer.getQueue().size() > 50) {
+                observer.purge();
+            }
+            return withBlock;
+        } catch (InterruptedException e) {
+            return DOMNotificationPublishService.REJECTED;
+        }
     }
 
     @Override
     public void close() {
+        observer.shutdown();
         disruptor.shutdown();
         executor.shutdown();
     }
index ae21c94ddb0f57390758a17c24e2a4bca2d9902a..e435d33c3f1c9814cb00945da1e3e3f7acb5f9d7 100644 (file)
@@ -7,25 +7,42 @@
  */
 package org.opendaylight.mdsal.dom.broker;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
 import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.lmax.disruptor.PhasedBackoffWaitStrategy;
+import com.lmax.disruptor.WaitStrategy;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
+
 import org.junit.Test;
 import org.opendaylight.mdsal.dom.api.DOMNotification;
 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
+import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
 public class DOMNotificationRouterTest extends TestUtils {
 
+    private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock(
+            1L, 30L, TimeUnit.MILLISECONDS);
+
     @Test
     public void create() throws Exception {
         assertNotNull(DOMNotificationRouter.create(1,1,1,TimeUnit.SECONDS));
@@ -37,7 +54,8 @@ public class DOMNotificationRouterTest extends TestUtils {
     public void complexTest() throws Exception {
         final DOMNotificationSubscriptionListener domNotificationSubscriptionListener =
                 mock(DOMNotificationSubscriptionListener.class);
-        final DOMNotificationListener domNotificationListener = new TestListener();
+        final CountDownLatch latch = new CountDownLatch(1);
+        final DOMNotificationListener domNotificationListener = new TestListener(latch);
         final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
 
         Multimap<SchemaPath, ?> listeners = domNotificationRouter.listeners();
@@ -87,6 +105,29 @@ public class DOMNotificationRouterTest extends TestUtils {
         assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
     }
 
+    @Test
+    public void testOfferNotificationWithBlocking() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        final TestListener testListener = new TestListener(latch);
+        final DOMNotification domNotification = mock(DOMNotification.class);
+        doReturn("test").when(domNotification).toString();
+        doReturn(SchemaPath.ROOT).when(domNotification).getType();
+        doReturn(TEST_CHILD).when(domNotification).getBody();
+        final TestRouter testRouter = TestRouter.create(1);
+        assertNotNull(testRouter.registerNotificationListener(testListener, SchemaPath.ROOT));
+        assertNotNull(testRouter.registerNotificationListener(testListener, SchemaPath.SAME));
+
+        assertNotEquals(DOMNotificationPublishService.REJECTED,
+                testRouter.offerNotification(domNotification, 3, TimeUnit.SECONDS));
+        assertTrue("Listener was not notified", latch.await(5, TimeUnit.SECONDS));
+        assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
+
+        assertEquals(DOMNotificationPublishService.REJECTED,
+                testRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
+        assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
+
+    }
+
     @Test
     public void close() throws Exception {
         final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
@@ -98,8 +139,47 @@ public class DOMNotificationRouterTest extends TestUtils {
         assertTrue(executor.isShutdown());
     }
 
-    private class TestListener implements DOMNotificationListener {
+    private static class TestListener implements DOMNotificationListener {
+        private final CountDownLatch latch;
+        private final List<DOMNotification>  receivedNotifications = new ArrayList<>();
+
+        TestListener(final CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        @Override
+        public void onNotification(@Nonnull final DOMNotification notification) {
+            receivedNotifications.add(notification);
+            latch.countDown();
+        }
+
+        public List<DOMNotification> getReceivedNotifications() {
+            return receivedNotifications;
+        }
+    }
+
+    private static class TestRouter extends DOMNotificationRouter {
+
+        TestRouter(ExecutorService executor, int queueDepth, WaitStrategy strategy) {
+            super(executor, queueDepth, strategy);
+        }
+
+        protected ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
+                final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+            return DOMNotificationPublishService.REJECTED;
+        }
+
         @Override
-        public void onNotification(@Nonnull final DOMNotification notification) {}
+        public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
+                throws InterruptedException {
+            Thread.sleep(2000);
+            return super.putNotification(notification);
+        }
+
+        public static TestRouter create(int queueDepth) {
+            final ExecutorService executor = Executors.newCachedThreadPool();
+
+            return new TestRouter(executor, queueDepth, DEFAULT_STRATEGY);
+        }
     }
-}
\ No newline at end of file
+}