Fix unsubscribe checks in DOMNotificationRouterEvent
[mdsal.git] / dom / mdsal-dom-broker / src / test / java / org / opendaylight / mdsal / dom / broker / DOMNotificationRouterTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18
19 import com.google.common.collect.Multimap;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.lmax.disruptor.PhasedBackoffWaitStrategy;
22 import com.lmax.disruptor.WaitStrategy;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.Test;
30 import org.opendaylight.mdsal.dom.api.DOMNotification;
31 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
32 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
33 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
34 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
35 import org.opendaylight.yangtools.util.ListenerRegistry;
36 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
37
38 public class DOMNotificationRouterTest extends TestUtils {
39
40     private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock(
41             1L, 30L, TimeUnit.MILLISECONDS);
42
43     @Test
44     public void create() throws Exception {
45         assertNotNull(DOMNotificationRouter.create(1,1,1,TimeUnit.SECONDS));
46         assertNotNull(DOMNotificationRouter.create(1));
47     }
48
49     @SuppressWarnings("checkstyle:IllegalCatch")
50     @Test
51     public void complexTest() throws Exception {
52         final DOMNotificationSubscriptionListener domNotificationSubscriptionListener =
53                 mock(DOMNotificationSubscriptionListener.class);
54         final CountDownLatch latch = new CountDownLatch(1);
55         final DOMNotificationListener domNotificationListener = new TestListener(latch);
56         final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
57
58         Multimap<SchemaPath, ?> listeners = domNotificationRouter.listeners();
59
60         assertTrue(listeners.isEmpty());
61         assertNotNull(domNotificationRouter.registerNotificationListener(domNotificationListener, SchemaPath.ROOT));
62         assertNotNull(domNotificationRouter.registerNotificationListener(domNotificationListener, SchemaPath.SAME));
63
64         listeners = domNotificationRouter.listeners();
65
66         assertFalse(listeners.isEmpty());
67
68         ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
69                 domNotificationRouter.subscriptionListeners();
70
71         assertEquals(0, subscriptionListeners.streamListeners().count());
72         assertNotNull(domNotificationRouter.registerSubscriptionListener(domNotificationSubscriptionListener));
73
74         subscriptionListeners = domNotificationRouter.subscriptionListeners();
75         assertSame(domNotificationSubscriptionListener,
76             subscriptionListeners.streamListeners().findAny().orElseThrow());
77
78         final DOMNotification domNotification = mock(DOMNotification.class);
79         doReturn("test").when(domNotification).toString();
80         doReturn(SchemaPath.ROOT).when(domNotification).getType();
81         doReturn(TEST_CHILD).when(domNotification).getBody();
82
83         assertNotNull(domNotificationRouter.offerNotification(domNotification));
84
85         try {
86             assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
87             assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
88         } catch (Exception e) {
89             assertTrue(e instanceof UnsupportedOperationException);
90         }
91
92         assertNotNull(domNotificationRouter.putNotification(domNotification));
93     }
94
95     @Test
96     public void offerNotification() throws Exception {
97         final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
98         final DOMNotification domNotification = mock(DOMNotification.class);
99         doReturn(SchemaPath.ROOT).when(domNotification).getType();
100         doReturn(TEST_CHILD).when(domNotification).getBody();
101         assertNotNull(domNotificationRouter.putNotification(domNotification));
102         assertNotNull(domNotificationRouter.offerNotification(domNotification));
103         assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
104     }
105
106     @Test
107     public void testOfferNotificationWithBlocking() throws Exception {
108         final CountDownLatch latch = new CountDownLatch(1);
109         final TestListener testListener = new TestListener(latch);
110         final DOMNotification domNotification = mock(DOMNotification.class);
111         doReturn("test").when(domNotification).toString();
112         doReturn(SchemaPath.ROOT).when(domNotification).getType();
113         doReturn(TEST_CHILD).when(domNotification).getBody();
114
115         try (TestRouter testRouter = new TestRouter(1)) {
116             assertNotNull(testRouter.registerNotificationListener(testListener, SchemaPath.ROOT));
117             assertNotNull(testRouter.registerNotificationListener(testListener, SchemaPath.SAME));
118
119             assertNotEquals(DOMNotificationPublishService.REJECTED,
120                 testRouter.offerNotification(domNotification, 3, TimeUnit.SECONDS));
121             assertTrue("Listener was not notified", latch.await(5, TimeUnit.SECONDS));
122             assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
123
124             assertEquals(DOMNotificationPublishService.REJECTED,
125                 testRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
126             assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
127         }
128     }
129
130     @Test
131     public void close() throws Exception {
132         final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
133         final ExecutorService executor = domNotificationRouter.executor();
134         final ExecutorService observer = domNotificationRouter.observer();
135
136         assertFalse(executor.isShutdown());
137         assertFalse(observer.isShutdown());
138         domNotificationRouter.close();
139         assertTrue(executor.isShutdown());
140         assertTrue(observer.isShutdown());
141     }
142
143     private static class TestListener implements DOMNotificationListener {
144         private final CountDownLatch latch;
145         private final List<DOMNotification>  receivedNotifications = new ArrayList<>();
146
147         TestListener(final CountDownLatch latch) {
148             this.latch = latch;
149         }
150
151         @Override
152         public void onNotification(final DOMNotification notification) {
153             receivedNotifications.add(notification);
154             latch.countDown();
155         }
156
157         public List<DOMNotification> getReceivedNotifications() {
158             return receivedNotifications;
159         }
160     }
161
162     private static class TestRouter extends DOMNotificationRouter {
163         TestRouter(final int queueDepth) {
164             super(queueDepth, DEFAULT_STRATEGY);
165         }
166
167         @Override
168         protected ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
169                 final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers) {
170             return DOMNotificationPublishService.REJECTED;
171         }
172
173         @Override
174         public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
175                 throws InterruptedException {
176             Thread.sleep(2000);
177             return super.putNotification(notification);
178         }
179     }
180 }