2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.dom.broker;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
19 import com.google.common.collect.Multimap;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.lmax.disruptor.PhasedBackoffWaitStrategy;
22 import com.lmax.disruptor.WaitStrategy;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.Test;
30 import org.opendaylight.mdsal.dom.api.DOMNotification;
31 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
32 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
33 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
34 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
35 import org.opendaylight.yangtools.util.ListenerRegistry;
36 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
38 public class DOMNotificationRouterTest extends TestUtils {
40 private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock(
41 1L, 30L, TimeUnit.MILLISECONDS);
44 public void create() throws Exception {
45 assertNotNull(DOMNotificationRouter.create(1,1,1,TimeUnit.SECONDS));
46 assertNotNull(DOMNotificationRouter.create(1));
49 @SuppressWarnings("checkstyle:IllegalCatch")
51 public void complexTest() throws Exception {
52 final DOMNotificationSubscriptionListener domNotificationSubscriptionListener =
53 mock(DOMNotificationSubscriptionListener.class);
54 final CountDownLatch latch = new CountDownLatch(1);
55 final DOMNotificationListener domNotificationListener = new TestListener(latch);
56 final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
58 Multimap<SchemaPath, ?> listeners = domNotificationRouter.listeners();
60 assertTrue(listeners.isEmpty());
61 assertNotNull(domNotificationRouter.registerNotificationListener(domNotificationListener, SchemaPath.ROOT));
62 assertNotNull(domNotificationRouter.registerNotificationListener(domNotificationListener, SchemaPath.SAME));
64 listeners = domNotificationRouter.listeners();
66 assertFalse(listeners.isEmpty());
68 ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
69 domNotificationRouter.subscriptionListeners();
71 assertEquals(0, subscriptionListeners.streamListeners().count());
72 assertNotNull(domNotificationRouter.registerSubscriptionListener(domNotificationSubscriptionListener));
74 subscriptionListeners = domNotificationRouter.subscriptionListeners();
75 assertSame(domNotificationSubscriptionListener,
76 subscriptionListeners.streamListeners().findAny().orElseThrow());
78 final DOMNotification domNotification = mock(DOMNotification.class);
79 doReturn("test").when(domNotification).toString();
80 doReturn(SchemaPath.ROOT).when(domNotification).getType();
81 doReturn(TEST_CHILD).when(domNotification).getBody();
83 assertNotNull(domNotificationRouter.offerNotification(domNotification));
86 assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
87 assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
88 } catch (Exception e) {
89 assertTrue(e instanceof UnsupportedOperationException);
92 assertNotNull(domNotificationRouter.putNotification(domNotification));
96 public void offerNotification() throws Exception {
97 final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
98 final DOMNotification domNotification = mock(DOMNotification.class);
99 doReturn(SchemaPath.ROOT).when(domNotification).getType();
100 doReturn(TEST_CHILD).when(domNotification).getBody();
101 assertNotNull(domNotificationRouter.putNotification(domNotification));
102 assertNotNull(domNotificationRouter.offerNotification(domNotification));
103 assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
107 public void testOfferNotificationWithBlocking() throws Exception {
108 final CountDownLatch latch = new CountDownLatch(1);
109 final TestListener testListener = new TestListener(latch);
110 final DOMNotification domNotification = mock(DOMNotification.class);
111 doReturn("test").when(domNotification).toString();
112 doReturn(SchemaPath.ROOT).when(domNotification).getType();
113 doReturn(TEST_CHILD).when(domNotification).getBody();
115 try (TestRouter testRouter = new TestRouter(1)) {
116 assertNotNull(testRouter.registerNotificationListener(testListener, SchemaPath.ROOT));
117 assertNotNull(testRouter.registerNotificationListener(testListener, SchemaPath.SAME));
119 assertNotEquals(DOMNotificationPublishService.REJECTED,
120 testRouter.offerNotification(domNotification, 3, TimeUnit.SECONDS));
121 assertTrue("Listener was not notified", latch.await(5, TimeUnit.SECONDS));
122 assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
124 assertEquals(DOMNotificationPublishService.REJECTED,
125 testRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
126 assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
131 public void close() throws Exception {
132 final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
133 final ExecutorService executor = domNotificationRouter.executor();
134 final ExecutorService observer = domNotificationRouter.observer();
136 assertFalse(executor.isShutdown());
137 assertFalse(observer.isShutdown());
138 domNotificationRouter.close();
139 assertTrue(executor.isShutdown());
140 assertTrue(observer.isShutdown());
143 private static class TestListener implements DOMNotificationListener {
144 private final CountDownLatch latch;
145 private final List<DOMNotification> receivedNotifications = new ArrayList<>();
147 TestListener(final CountDownLatch latch) {
152 public void onNotification(final DOMNotification notification) {
153 receivedNotifications.add(notification);
157 public List<DOMNotification> getReceivedNotifications() {
158 return receivedNotifications;
162 private static class TestRouter extends DOMNotificationRouter {
163 TestRouter(final int queueDepth) {
164 super(queueDepth, DEFAULT_STRATEGY);
168 protected ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
169 final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers) {
170 return DOMNotificationPublishService.REJECTED;
174 public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
175 throws InterruptedException {
177 return super.putNotification(notification);