2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.dom.broker;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
18 import com.google.common.collect.Multimap;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.lmax.disruptor.PhasedBackoffWaitStrategy;
21 import com.lmax.disruptor.WaitStrategy;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.TimeUnit;
30 import javax.annotation.Nonnull;
32 import org.junit.Test;
33 import org.opendaylight.mdsal.dom.api.DOMNotification;
34 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
35 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
36 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
37 import org.opendaylight.yangtools.concepts.ListenerRegistration;
38 import org.opendaylight.yangtools.util.ListenerRegistry;
39 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
41 public class DOMNotificationRouterTest extends TestUtils {
43 private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock(
44 1L, 30L, TimeUnit.MILLISECONDS);
47 public void create() throws Exception {
48 assertNotNull(DOMNotificationRouter.create(1,1,1,TimeUnit.SECONDS));
49 assertNotNull(DOMNotificationRouter.create(1));
52 @SuppressWarnings("checkstyle:IllegalCatch")
54 public void complexTest() throws Exception {
55 final DOMNotificationSubscriptionListener domNotificationSubscriptionListener =
56 mock(DOMNotificationSubscriptionListener.class);
57 final CountDownLatch latch = new CountDownLatch(1);
58 final DOMNotificationListener domNotificationListener = new TestListener(latch);
59 final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
61 Multimap<SchemaPath, ?> listeners = domNotificationRouter.listeners();
63 assertTrue(listeners.isEmpty());
64 assertNotNull(domNotificationRouter.registerNotificationListener(domNotificationListener, SchemaPath.ROOT));
65 assertNotNull(domNotificationRouter.registerNotificationListener(domNotificationListener, SchemaPath.SAME));
67 listeners = domNotificationRouter.listeners();
69 assertFalse(listeners.isEmpty());
71 ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
72 domNotificationRouter.subscriptionListeners();
74 assertFalse(subscriptionListeners.iterator().hasNext());
75 assertNotNull(domNotificationRouter.registerSubscriptionListener(domNotificationSubscriptionListener));
77 subscriptionListeners = domNotificationRouter.subscriptionListeners();
78 assertTrue(subscriptionListeners.iterator().hasNext());
80 final DOMNotification domNotification = mock(DOMNotification.class);
81 doReturn("test").when(domNotification).toString();
82 doReturn(SchemaPath.ROOT).when(domNotification).getType();
83 doReturn(TEST_CHILD).when(domNotification).getBody();
85 assertNotNull(domNotificationRouter.offerNotification(domNotification));
88 assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
89 assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
90 } catch (Exception e) {
91 assertTrue(e instanceof UnsupportedOperationException);
94 assertNotNull(domNotificationRouter.putNotification(domNotification));
98 public void offerNotification() throws Exception {
99 final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
100 final DOMNotification domNotification = mock(DOMNotification.class);
101 doReturn(SchemaPath.ROOT).when(domNotification).getType();
102 doReturn(TEST_CHILD).when(domNotification).getBody();
103 assertNotNull(domNotificationRouter.putNotification(domNotification));
104 assertNotNull(domNotificationRouter.offerNotification(domNotification));
105 assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
109 public void testOfferNotificationWithBlocking() throws Exception {
110 final CountDownLatch latch = new CountDownLatch(1);
111 final TestListener testListener = new TestListener(latch);
112 final DOMNotification domNotification = mock(DOMNotification.class);
113 doReturn("test").when(domNotification).toString();
114 doReturn(SchemaPath.ROOT).when(domNotification).getType();
115 doReturn(TEST_CHILD).when(domNotification).getBody();
116 final TestRouter testRouter = TestRouter.create(1);
117 assertNotNull(testRouter.registerNotificationListener(testListener, SchemaPath.ROOT));
118 assertNotNull(testRouter.registerNotificationListener(testListener, SchemaPath.SAME));
120 assertNotEquals(DOMNotificationPublishService.REJECTED,
121 testRouter.offerNotification(domNotification, 3, TimeUnit.SECONDS));
122 assertTrue("Listener was not notified", latch.await(5, TimeUnit.SECONDS));
123 assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
125 assertEquals(DOMNotificationPublishService.REJECTED,
126 testRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
127 assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
132 public void close() throws Exception {
133 final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
135 final ExecutorService executor = domNotificationRouter.executor();
137 assertFalse(executor.isShutdown());
138 domNotificationRouter.close();
139 assertTrue(executor.isShutdown());
142 private static class TestListener implements DOMNotificationListener {
143 private final CountDownLatch latch;
144 private final List<DOMNotification> receivedNotifications = new ArrayList<>();
146 TestListener(final CountDownLatch latch) {
151 public void onNotification(@Nonnull final DOMNotification notification) {
152 receivedNotifications.add(notification);
156 public List<DOMNotification> getReceivedNotifications() {
157 return receivedNotifications;
161 private static class TestRouter extends DOMNotificationRouter {
163 TestRouter(ExecutorService executor, int queueDepth, WaitStrategy strategy) {
164 super(executor, queueDepth, strategy);
167 protected ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
168 final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
169 return DOMNotificationPublishService.REJECTED;
173 public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
174 throws InterruptedException {
176 return super.putNotification(notification);
179 public static TestRouter create(int queueDepth) {
180 final ExecutorService executor = Executors.newCachedThreadPool();
182 return new TestRouter(executor, queueDepth, DEFAULT_STRATEGY);