2 * Copyright (c) 2017 - 2018 Red Hat, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.datastoreutils.listeners.tests;
10 import static com.google.common.truth.Truth.assertThat;
11 import static org.awaitility.Awaitility.await;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Matchers.anyBoolean;
14 import static org.mockito.Matchers.anyCollection;
15 import static org.mockito.Matchers.anyLong;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Mockito.doAnswer;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.spy;
22 import static org.mockito.Mockito.verify;
23 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.TOP_FOO_KEY;
24 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.USES_ONE_KEY;
25 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.complexUsesAugment;
26 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.path;
27 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.topLevelList;
28 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
30 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
31 import com.google.common.util.concurrent.MoreExecutors;
32 import com.google.common.util.concurrent.Uninterruptibles;
33 import java.time.Duration;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.ScheduledFuture;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.function.Function;
40 import org.junit.Rule;
41 import org.junit.Test;
42 import org.mockito.ArgumentCaptor;
43 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
44 import org.opendaylight.genius.datastoreutils.listeners.DataTreeEventCallbackRegistrar;
45 import org.opendaylight.genius.datastoreutils.listeners.DataTreeEventCallbackRegistrar.NextAction;
46 import org.opendaylight.genius.datastoreutils.listeners.internal.DataTreeEventCallbackRegistrarImpl;
47 import org.opendaylight.genius.infra.RetryingManagedNewTransactionRunner;
48 import org.opendaylight.infrautils.testutils.LogCaptureRule;
49 import org.opendaylight.infrautils.testutils.LogRule;
50 import org.opendaylight.mdsal.binding.api.DataBroker;
51 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
52 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
53 import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractConcurrentDataBrokerTest;
54 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
56 import org.opendaylight.yangtools.concepts.ListenerRegistration;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
62 * Test for {@link DataTreeEventCallbackRegistrarImpl}.
64 * @author Michael Vorburger.ch
66 public class DataTreeEventCallbackRegistrarTest {
68 // TODO add similar tests as for onAdd() also for onUpdate() and onDelete() and onAddOrUpdate()
70 private static final Logger LOG = LoggerFactory.getLogger(DataTreeEventCallbackRegistrarTest.class);
72 private static final InstanceIdentifier<TopLevelList> FOO_PATH = path(TOP_FOO_KEY);
73 private static final TopLevelList FOO_DATA = topLevelList(TOP_FOO_KEY, complexUsesAugment(USES_ONE_KEY));
75 public @Rule LogRule logRule = new LogRule();
76 public @Rule LogCaptureRule logCaptureRule = new LogCaptureRule();
78 private final DataBroker db;
79 private final SingleTransactionDataBroker db1;
81 public DataTreeEventCallbackRegistrarTest() throws Exception {
82 // Argument true to make sure we use the multi-threaded DataTreeChangeListenerExecutor
83 // because otherwise we hit a deadlock :( with this test!
84 /*ConstantSchemaAbstractDataBrokerTest dataBrokerTest = new ConstantSchemaAbstractDataBrokerTest(true) {
86 protected Set<YangModuleInfo> getModuleInfos() throws Exception {
87 return ImmutableSet.of(BindingReflections.getModuleInfo(TwoLevelList.class),
88 BindingReflections.getModuleInfo(TreeComplexUsesAugment.class));
92 dataBrokerTest.setup();
93 db = dataBrokerTest.getDataBroker();
94 db1 = new SingleTransactionDataBroker(db);*/
95 AbstractConcurrentDataBrokerTest dataBrokerTest =
96 new AbstractConcurrentDataBrokerTest(true) {};
97 dataBrokerTest.setup();
98 db = dataBrokerTest.getDataBroker();
99 db1 = new SingleTransactionDataBroker(db);
104 public void testAddAndUnregister() throws TransactionCommitFailedException {
105 checkAdd(NextAction.UNREGISTER);
109 public void testAddAndKeepRegistered() throws TransactionCommitFailedException {
110 checkAdd(NextAction.CALL_AGAIN);
114 public void testAddOrUpdateAdd() throws TransactionCommitFailedException {
115 DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar = new DataTreeEventCallbackRegistrarImpl(db);
116 AtomicBoolean added = new AtomicBoolean(false);
117 dataTreeEventCallbackRegistrar.onAddOrUpdate(OPERATIONAL, FOO_PATH, (first, second) -> {
118 if (first == null && second != null) {
121 return NextAction.UNREGISTER;
123 db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
124 await().untilTrue(added);
129 public void testAddOrUpdateUpdate() throws TransactionCommitFailedException {
130 DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar = new DataTreeEventCallbackRegistrarImpl(db);
131 AtomicBoolean updated = new AtomicBoolean(false);
132 dataTreeEventCallbackRegistrar.onAddOrUpdate(OPERATIONAL, FOO_PATH, (first, second) -> {
133 if (first != null && second != null) {
135 return NextAction.UNREGISTER;
137 return NextAction.CALL_AGAIN;
139 db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
140 db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
141 await().untilTrue(updated);
145 @SuppressWarnings("unchecked")
146 private void checkAdd(NextAction nextAction) throws TransactionCommitFailedException {
147 DataBroker spiedDataBroker = spy(db);
149 ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
150 doAnswer(invocation -> {
151 ListenerRegistration<?> realReg = db.registerDataTreeChangeListener(
152 invocation.getArgument(0),
153 invocation.getArgument(1));
154 doAnswer(ignored -> {
157 }).when(mockListenerReg).close();
158 return mockListenerReg;
159 }).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
161 DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar =
162 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker);
164 AtomicBoolean added = new AtomicBoolean(false);
165 dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, topLevelList -> {
166 if (topLevelList.equals(FOO_DATA)) {
169 LOG.error("Expected: {} but was: {}", FOO_DATA, topLevelList);
170 assertThat(topLevelList).isEqualTo(FOO_DATA);
175 db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
176 await().untilTrue(added);
178 if (nextAction.equals(NextAction.UNREGISTER)) {
179 verify(mockListenerReg).close();
182 db1.syncDelete(OPERATIONAL, FOO_PATH);
184 db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
185 await().untilTrue(added);
186 verify(mockListenerReg, never()).close();
191 public void testAddWithTimeoutWhichExpires() throws InterruptedException {
192 DataBroker spiedDataBroker = spy(db);
194 ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
195 doReturn(mockListenerReg).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
197 DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar =
198 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker);
200 AtomicBoolean timedOut = new AtomicBoolean(false);
201 dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, topLevelList -> { /* NOOP */ },
202 Duration.ofMillis(50), iid -> {
203 if (iid.equals(DataTreeIdentifier.create(OPERATIONAL, FOO_PATH))) {
210 await().untilTrue(timedOut);
211 verify(mockListenerReg).close();
215 public void testAddWithTimeoutNeverHits() throws TransactionCommitFailedException, InterruptedException {
216 AtomicBoolean added = new AtomicBoolean(false);
217 AtomicBoolean timedOut = new AtomicBoolean(false);
218 DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar = new DataTreeEventCallbackRegistrarImpl(db);
219 dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, topLevelList -> {
221 }, Duration.ofMillis(3000), iid -> timedOut.set(true));
223 // This test is timing sensitive, and a too low timeout value (above), or slow machine, could make this fail :(
224 db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
225 await().untilTrue(added);
226 await().untilFalse(timedOut);
229 @SuppressWarnings({ "unchecked", "rawtypes" })
231 public void testExceptionInCallbackMustBeLogged() throws TransactionCommitFailedException, InterruptedException {
232 logCaptureRule.expectLastErrorMessageContains("Error invoking worker");
234 DataBroker spiedDataBroker = spy(db);
235 final DataTreeChangeListener mockListener = mock(DataTreeChangeListener.class, "TestListener");
236 doAnswer(invocation -> db.registerDataTreeChangeListener(invocation.getArgument(0),
237 mockListener)).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
239 AtomicBoolean added = new AtomicBoolean(false);
240 DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar =
241 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker);
242 dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH,
243 (Function<TopLevelList, NextAction>) topLevelList -> {
245 throw new IllegalStateException("TEST");
248 ArgumentCaptor<DataTreeChangeListener> realListener = ArgumentCaptor.forClass(DataTreeChangeListener.class);
249 verify(spiedDataBroker).registerDataTreeChangeListener(any(), realListener.capture());
251 AtomicBoolean onDataTreeChangeDone = new AtomicBoolean(false);
252 doAnswer(invocation -> {
254 realListener.getValue().onDataTreeChanged(invocation.getArgument(0));
256 onDataTreeChangeDone.set(true);
259 }).when(mockListener).onDataTreeChanged(anyCollection());
261 db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
262 await().untilTrue(added);
263 await().untilTrue(onDataTreeChangeDone);
267 public void testTimeoutCallbackNotInvokedWhileHandlingChangeNotificationForUnregister() {
268 testTimeoutCallbackNotInvokedWhileHandlingChangeNotification(NextAction.UNREGISTER);
272 public void testTimeoutCallbackIsInvokedWhileHandlingChangeNotificationForCallAgain() {
273 testTimeoutCallbackNotInvokedWhileHandlingChangeNotification(NextAction.CALL_AGAIN);
276 private void testTimeoutCallbackNotInvokedWhileHandlingChangeNotification(NextAction nextAction) {
277 Duration timeout = Duration.ofMillis(10);
279 ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
280 ScheduledFuture<?> mockScheduledFuture = mock(ScheduledFuture.class);
281 doReturn(mockScheduledFuture).when(mockScheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
283 ListeningScheduledExecutorService directExecutorService = MoreExecutors.listeningDecorator(mockScheduler);
285 DataTreeEventCallbackRegistrar callbackRegistrar =
286 new DataTreeEventCallbackRegistrarImpl(db, directExecutorService);
288 CountDownLatch inChangeCallback = new CountDownLatch(1);
289 CountDownLatch changeCallbackContinue = new CountDownLatch(1);
290 AtomicBoolean updated = new AtomicBoolean(false);
291 AtomicBoolean timedOut = new AtomicBoolean(false);
292 callbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, dataObject -> {
293 inChangeCallback.countDown();
294 Uninterruptibles.awaitUninterruptibly(changeCallbackContinue);
296 // Sleep a bit for the timeout task - see below.
297 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
300 }, timeout, id -> timedOut.set(true));
302 ArgumentCaptor<Runnable> timerTask = ArgumentCaptor.forClass(Runnable.class);
303 verify(mockScheduler).schedule(timerTask.capture(), eq(timeout.toMillis()), eq(TimeUnit.MILLISECONDS));
305 new RetryingManagedNewTransactionRunner(db, 1).callWithNewWriteOnlyTransactionAndSubmit(
306 tx -> tx.mergeParentStructurePut(OPERATIONAL, FOO_PATH, FOO_DATA));
308 // Wait for the change notification callback to be invoked.
310 assertThat(Uninterruptibles.awaitUninterruptibly(inChangeCallback, 5, TimeUnit.SECONDS)).isTrue();
312 // Now artificially fire the timeout task on a separate thread.
314 CountDownLatch timerTaskDone = new CountDownLatch(1);
316 // We have to tell the notification change callback to continue prior to invoking the timeout task as
317 // the latter should block internally in DataTreeEventCallbackRegistrarImpl while the change notification
318 // is still in progress. The change callback sleeps a bit to give the timeout task plenty of time to
319 // complete if it didn't block.
320 changeCallbackContinue.countDown();
321 timerTask.getValue().run();
322 timerTaskDone.countDown();
325 await().timeout(5, TimeUnit.SECONDS).untilTrue(updated);
327 assertThat(Uninterruptibles.awaitUninterruptibly(timerTaskDone, 5, TimeUnit.SECONDS)).isTrue();
329 if (nextAction.equals(NextAction.UNREGISTER)) {
330 assertThat(timedOut.get()).isFalse();
331 verify(mockScheduledFuture).cancel(anyBoolean());
333 assertThat(timedOut.get()).isTrue();
334 verify(mockScheduledFuture, never()).cancel(anyBoolean());
338 @SuppressWarnings({ "rawtypes", "unchecked" })
340 public void testChangeCallbackNotInvokedAfterTimeout() {
341 Duration timeout = Duration.ofMillis(10);
343 ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
344 ScheduledFuture<?> mockScheduledFuture = mock(ScheduledFuture.class);
345 doReturn(mockScheduledFuture).when(mockScheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
347 ListeningScheduledExecutorService directExecutorService = MoreExecutors.listeningDecorator(mockScheduler);
349 DataBroker spiedDataBroker = spy(db);
351 final DataTreeChangeListener mockListener = mock(DataTreeChangeListener.class);
352 doAnswer(invocation -> {
353 db.registerDataTreeChangeListener(invocation.getArgument(0), mockListener);
354 return mock(ListenerRegistration.class);
355 }).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
357 DataTreeEventCallbackRegistrar callbackRegistrar =
358 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker, directExecutorService);
360 AtomicBoolean updated = new AtomicBoolean(false);
361 AtomicBoolean timedOut = new AtomicBoolean(false);
362 callbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, dataObject -> {
364 return NextAction.UNREGISTER;
365 }, timeout, id -> timedOut.set(true));
367 ArgumentCaptor<Runnable> timerTask = ArgumentCaptor.forClass(Runnable.class);
368 verify(mockScheduler).schedule(timerTask.capture(), eq(timeout.toMillis()), eq(TimeUnit.MILLISECONDS));
370 ArgumentCaptor<DataTreeChangeListener> realListener = ArgumentCaptor.forClass(DataTreeChangeListener.class);
371 verify(spiedDataBroker).registerDataTreeChangeListener(any(), realListener.capture());
373 timerTask.getValue().run();
374 assertThat(timedOut.get()).isTrue();
376 AtomicBoolean onDataTreeChangeDone = new AtomicBoolean(false);
377 doAnswer(invocation -> {
379 realListener.getValue().onDataTreeChanged(invocation.getArgument(0));
381 onDataTreeChangeDone.set(true);
384 }).when(mockListener).onDataTreeChanged(anyCollection());
386 new RetryingManagedNewTransactionRunner(db, 1).callWithNewWriteOnlyTransactionAndSubmit(
387 tx -> tx.mergeParentStructurePut(OPERATIONAL, FOO_PATH, FOO_DATA));
389 await().untilTrue(onDataTreeChangeDone);
390 assertThat(updated.get()).isFalse();
393 @SuppressWarnings("unchecked")
395 public void testChangeCallbackOccursImmediatelyAfterRegistration() {
396 ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
397 ScheduledFuture<?> mockScheduledFuture = mock(ScheduledFuture.class);
398 doReturn(mockScheduledFuture).when(mockScheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
400 DataBroker spiedDataBroker = spy(db);
402 AtomicBoolean updated = new AtomicBoolean(false);
403 ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
404 doAnswer(invocation -> {
405 DataTreeChangeListener<?> listener = invocation.getArgument(1);
406 db.registerDataTreeChangeListener(invocation.getArgument(0), listener);
407 db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
408 await().untilTrue(updated);
409 return mockListenerReg;
410 }).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
412 DataTreeEventCallbackRegistrar callbackRegistrar =
413 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker, mockScheduler);
415 callbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, dataObject -> {
417 return NextAction.UNREGISTER;
418 }, Duration.ofMillis(10), id -> { });
420 await().untilTrue(updated);
421 verify(mockListenerReg).close();
422 verify(mockScheduledFuture).cancel(anyBoolean());