2 * Copyright (c) 2017 - 2018 Red Hat, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.datastoreutils.listeners.tests;
10 import static com.google.common.truth.Truth.assertThat;
11 import static org.awaitility.Awaitility.await;
12 import static org.mockito.ArgumentMatchers.any;
13 import static org.mockito.ArgumentMatchers.anyBoolean;
14 import static org.mockito.ArgumentMatchers.anyCollection;
15 import static org.mockito.ArgumentMatchers.anyLong;
16 import static org.mockito.ArgumentMatchers.eq;
17 import static org.mockito.Mockito.doAnswer;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.spy;
22 import static org.mockito.Mockito.verify;
23 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.TOP_FOO_KEY;
24 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.USES_ONE_KEY;
25 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.complexUsesAugment;
26 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.path;
27 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.topLevelList;
28 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
30 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
31 import com.google.common.util.concurrent.MoreExecutors;
32 import com.google.common.util.concurrent.Uninterruptibles;
33 import java.time.Duration;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.ScheduledFuture;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.function.Function;
40 import org.junit.Rule;
41 import org.junit.Test;
42 import org.mockito.ArgumentCaptor;
43 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
44 import org.opendaylight.genius.datastoreutils.listeners.DataTreeEventCallbackRegistrar;
45 import org.opendaylight.genius.datastoreutils.listeners.DataTreeEventCallbackRegistrar.NextAction;
46 import org.opendaylight.genius.datastoreutils.listeners.internal.DataTreeEventCallbackRegistrarImpl;
47 import org.opendaylight.genius.infra.RetryingManagedNewTransactionRunner;
48 import org.opendaylight.infrautils.testutils.LogCaptureRule;
49 import org.opendaylight.infrautils.testutils.LogRule;
50 import org.opendaylight.mdsal.binding.api.DataBroker;
51 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
52 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
53 import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractConcurrentDataBrokerTest;
54 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
56 import org.opendaylight.yangtools.concepts.ListenerRegistration;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
62 * Test for {@link DataTreeEventCallbackRegistrarImpl}.
64 * @author Michael Vorburger.ch
66 public class DataTreeEventCallbackRegistrarTest {
68 // TODO add similar tests as for onAdd() also for onUpdate() and onDelete() and onAddOrUpdate()
70 private static final Logger LOG = LoggerFactory.getLogger(DataTreeEventCallbackRegistrarTest.class);
72 private static final InstanceIdentifier<TopLevelList> FOO_PATH = path(TOP_FOO_KEY);
73 private static final TopLevelList FOO_DATA = topLevelList(TOP_FOO_KEY, complexUsesAugment(USES_ONE_KEY));
75 public @Rule LogRule logRule = new LogRule();
76 public @Rule LogCaptureRule logCaptureRule = new LogCaptureRule();
78 private final DataBroker db;
79 private final SingleTransactionDataBroker db1;
81 public DataTreeEventCallbackRegistrarTest() throws Exception {
82 // Argument true to make sure we use the multi-threaded DataTreeChangeListenerExecutor
83 // because otherwise we hit a deadlock :( with this test!
84 /*ConstantSchemaAbstractDataBrokerTest dataBrokerTest = new ConstantSchemaAbstractDataBrokerTest(true) {
86 protected Set<YangModuleInfo> getModuleInfos() throws Exception {
87 return ImmutableSet.of(BindingReflections.getModuleInfo(TwoLevelList.class),
88 BindingReflections.getModuleInfo(TreeComplexUsesAugment.class));
92 dataBrokerTest.setup();
93 db = dataBrokerTest.getDataBroker();
94 db1 = new SingleTransactionDataBroker(db);*/
95 AbstractConcurrentDataBrokerTest dataBrokerTest =
96 new AbstractConcurrentDataBrokerTest(true) {};
97 dataBrokerTest.setup();
98 db = dataBrokerTest.getDataBroker();
99 db1 = new SingleTransactionDataBroker(db);
104 public void testAddAndUnregister() throws TransactionCommitFailedException {
105 checkAdd(NextAction.UNREGISTER);
109 public void testAddAndKeepRegistered() throws TransactionCommitFailedException {
110 checkAdd(NextAction.CALL_AGAIN);
114 public void testAddOrUpdateAdd() throws TransactionCommitFailedException {
115 DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar = new DataTreeEventCallbackRegistrarImpl(db);
116 AtomicBoolean added = new AtomicBoolean(false);
117 dataTreeEventCallbackRegistrar.onAddOrUpdate(OPERATIONAL, FOO_PATH, (first, second) -> {
118 if (first == null && second != null) {
121 return NextAction.UNREGISTER;
123 db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
124 await().untilTrue(added);
129 public void testAddOrUpdateUpdate() throws TransactionCommitFailedException {
130 DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar = new DataTreeEventCallbackRegistrarImpl(db);
131 AtomicBoolean updated = new AtomicBoolean(false);
132 dataTreeEventCallbackRegistrar.onAddOrUpdate(OPERATIONAL, FOO_PATH, (first, second) -> {
133 if (first != null && second != null) {
135 return NextAction.UNREGISTER;
137 return NextAction.CALL_AGAIN;
139 db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
140 db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
141 await().untilTrue(updated);
145 private void checkAdd(NextAction nextAction) throws TransactionCommitFailedException {
146 DataBroker spiedDataBroker = spy(db);
148 ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
149 doAnswer(invocation -> {
150 ListenerRegistration<?> realReg = db.registerDataTreeChangeListener(
151 invocation.getArgument(0),
152 invocation.getArgument(1));
153 doAnswer(ignored -> {
156 }).when(mockListenerReg).close();
157 return mockListenerReg;
158 }).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
160 DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar =
161 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker);
163 AtomicBoolean added = new AtomicBoolean(false);
164 dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, topLevelList -> {
165 if (topLevelList.equals(FOO_DATA)) {
168 LOG.error("Expected: {} but was: {}", FOO_DATA, topLevelList);
169 assertThat(topLevelList).isEqualTo(FOO_DATA);
174 db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
175 await().untilTrue(added);
177 if (nextAction.equals(NextAction.UNREGISTER)) {
178 verify(mockListenerReg).close();
181 db1.syncDelete(OPERATIONAL, FOO_PATH);
183 db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
184 await().untilTrue(added);
185 verify(mockListenerReg, never()).close();
190 public void testAddWithTimeoutWhichExpires() throws InterruptedException {
191 DataBroker spiedDataBroker = spy(db);
193 ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
194 doReturn(mockListenerReg).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
196 DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar =
197 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker);
199 AtomicBoolean timedOut = new AtomicBoolean(false);
200 dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, topLevelList -> { /* NOOP */ },
201 Duration.ofMillis(50), iid -> {
202 if (iid.equals(DataTreeIdentifier.create(OPERATIONAL, FOO_PATH))) {
209 await().untilTrue(timedOut);
210 verify(mockListenerReg).close();
214 public void testAddWithTimeoutNeverHits() throws TransactionCommitFailedException, InterruptedException {
215 AtomicBoolean added = new AtomicBoolean(false);
216 AtomicBoolean timedOut = new AtomicBoolean(false);
217 DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar = new DataTreeEventCallbackRegistrarImpl(db);
218 dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, topLevelList -> {
220 }, Duration.ofMillis(3000), iid -> timedOut.set(true));
222 // This test is timing sensitive, and a too low timeout value (above), or slow machine, could make this fail :(
223 db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
224 await().untilTrue(added);
225 await().untilFalse(timedOut);
228 @SuppressWarnings({ "unchecked", "rawtypes" })
230 public void testExceptionInCallbackMustBeLogged() throws TransactionCommitFailedException, InterruptedException {
231 logCaptureRule.expectLastErrorMessageContains("Error invoking worker");
233 DataBroker spiedDataBroker = spy(db);
234 final DataTreeChangeListener mockListener = mock(DataTreeChangeListener.class, "TestListener");
235 doAnswer(invocation -> db.registerDataTreeChangeListener(invocation.getArgument(0),
236 mockListener)).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
238 AtomicBoolean added = new AtomicBoolean(false);
239 DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar =
240 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker);
241 dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH,
242 (Function<TopLevelList, NextAction>) topLevelList -> {
244 throw new IllegalStateException("TEST");
247 ArgumentCaptor<DataTreeChangeListener> realListener = ArgumentCaptor.forClass(DataTreeChangeListener.class);
248 verify(spiedDataBroker).registerDataTreeChangeListener(any(), realListener.capture());
250 AtomicBoolean onDataTreeChangeDone = new AtomicBoolean(false);
251 doAnswer(invocation -> {
253 realListener.getValue().onDataTreeChanged(invocation.getArgument(0));
255 onDataTreeChangeDone.set(true);
258 }).when(mockListener).onDataTreeChanged(anyCollection());
260 db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
261 await().untilTrue(added);
262 await().untilTrue(onDataTreeChangeDone);
266 public void testTimeoutCallbackNotInvokedWhileHandlingChangeNotificationForUnregister() {
267 testTimeoutCallbackNotInvokedWhileHandlingChangeNotification(NextAction.UNREGISTER);
271 public void testTimeoutCallbackIsInvokedWhileHandlingChangeNotificationForCallAgain() {
272 testTimeoutCallbackNotInvokedWhileHandlingChangeNotification(NextAction.CALL_AGAIN);
275 private void testTimeoutCallbackNotInvokedWhileHandlingChangeNotification(NextAction nextAction) {
276 Duration timeout = Duration.ofMillis(10);
278 ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
279 ScheduledFuture<?> mockScheduledFuture = mock(ScheduledFuture.class);
280 doReturn(mockScheduledFuture).when(mockScheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
282 ListeningScheduledExecutorService directExecutorService = MoreExecutors.listeningDecorator(mockScheduler);
284 DataTreeEventCallbackRegistrar callbackRegistrar =
285 new DataTreeEventCallbackRegistrarImpl(db, directExecutorService);
287 CountDownLatch inChangeCallback = new CountDownLatch(1);
288 CountDownLatch changeCallbackContinue = new CountDownLatch(1);
289 AtomicBoolean updated = new AtomicBoolean(false);
290 AtomicBoolean timedOut = new AtomicBoolean(false);
291 callbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, dataObject -> {
292 inChangeCallback.countDown();
293 Uninterruptibles.awaitUninterruptibly(changeCallbackContinue);
295 // Sleep a bit for the timeout task - see below.
296 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
299 }, timeout, id -> timedOut.set(true));
301 ArgumentCaptor<Runnable> timerTask = ArgumentCaptor.forClass(Runnable.class);
302 verify(mockScheduler).schedule(timerTask.capture(), eq(timeout.toMillis()), eq(TimeUnit.MILLISECONDS));
304 new RetryingManagedNewTransactionRunner(db, 1).callWithNewWriteOnlyTransactionAndSubmit(
305 tx -> tx.mergeParentStructurePut(OPERATIONAL, FOO_PATH, FOO_DATA));
307 // Wait for the change notification callback to be invoked.
309 assertThat(Uninterruptibles.awaitUninterruptibly(inChangeCallback, 5, TimeUnit.SECONDS)).isTrue();
311 // Now artificially fire the timeout task on a separate thread.
313 CountDownLatch timerTaskDone = new CountDownLatch(1);
315 // We have to tell the notification change callback to continue prior to invoking the timeout task as
316 // the latter should block internally in DataTreeEventCallbackRegistrarImpl while the change notification
317 // is still in progress. The change callback sleeps a bit to give the timeout task plenty of time to
318 // complete if it didn't block.
319 changeCallbackContinue.countDown();
320 timerTask.getValue().run();
321 timerTaskDone.countDown();
324 await().timeout(5, TimeUnit.SECONDS).untilTrue(updated);
326 assertThat(Uninterruptibles.awaitUninterruptibly(timerTaskDone, 5, TimeUnit.SECONDS)).isTrue();
328 if (nextAction.equals(NextAction.UNREGISTER)) {
329 assertThat(timedOut.get()).isFalse();
330 verify(mockScheduledFuture).cancel(anyBoolean());
332 assertThat(timedOut.get()).isTrue();
333 verify(mockScheduledFuture, never()).cancel(anyBoolean());
337 @SuppressWarnings({ "rawtypes", "unchecked" })
339 public void testChangeCallbackNotInvokedAfterTimeout() {
340 Duration timeout = Duration.ofMillis(10);
342 ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
343 ScheduledFuture<?> mockScheduledFuture = mock(ScheduledFuture.class);
344 doReturn(mockScheduledFuture).when(mockScheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
346 ListeningScheduledExecutorService directExecutorService = MoreExecutors.listeningDecorator(mockScheduler);
348 DataBroker spiedDataBroker = spy(db);
350 final DataTreeChangeListener mockListener = mock(DataTreeChangeListener.class);
351 doAnswer(invocation -> {
352 db.registerDataTreeChangeListener(invocation.getArgument(0), mockListener);
353 return mock(ListenerRegistration.class);
354 }).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
356 DataTreeEventCallbackRegistrar callbackRegistrar =
357 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker, directExecutorService);
359 AtomicBoolean updated = new AtomicBoolean(false);
360 AtomicBoolean timedOut = new AtomicBoolean(false);
361 callbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, dataObject -> {
363 return NextAction.UNREGISTER;
364 }, timeout, id -> timedOut.set(true));
366 ArgumentCaptor<Runnable> timerTask = ArgumentCaptor.forClass(Runnable.class);
367 verify(mockScheduler).schedule(timerTask.capture(), eq(timeout.toMillis()), eq(TimeUnit.MILLISECONDS));
369 ArgumentCaptor<DataTreeChangeListener> realListener = ArgumentCaptor.forClass(DataTreeChangeListener.class);
370 verify(spiedDataBroker).registerDataTreeChangeListener(any(), realListener.capture());
372 timerTask.getValue().run();
373 assertThat(timedOut.get()).isTrue();
375 AtomicBoolean onDataTreeChangeDone = new AtomicBoolean(false);
376 doAnswer(invocation -> {
378 realListener.getValue().onDataTreeChanged(invocation.getArgument(0));
380 onDataTreeChangeDone.set(true);
383 }).when(mockListener).onDataTreeChanged(anyCollection());
385 new RetryingManagedNewTransactionRunner(db, 1).callWithNewWriteOnlyTransactionAndSubmit(
386 tx -> tx.mergeParentStructurePut(OPERATIONAL, FOO_PATH, FOO_DATA));
388 await().untilTrue(onDataTreeChangeDone);
389 assertThat(updated.get()).isFalse();
393 public void testChangeCallbackOccursImmediatelyAfterRegistration() {
394 ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
395 ScheduledFuture<?> mockScheduledFuture = mock(ScheduledFuture.class);
396 doReturn(mockScheduledFuture).when(mockScheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
398 DataBroker spiedDataBroker = spy(db);
400 AtomicBoolean updated = new AtomicBoolean(false);
401 ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
402 doAnswer(invocation -> {
403 DataTreeChangeListener<?> listener = invocation.getArgument(1);
404 db.registerDataTreeChangeListener(invocation.getArgument(0), listener);
405 db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
406 await().untilTrue(updated);
407 return mockListenerReg;
408 }).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
410 DataTreeEventCallbackRegistrar callbackRegistrar =
411 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker, mockScheduler);
413 callbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, dataObject -> {
415 return NextAction.UNREGISTER;
416 }, Duration.ofMillis(10), id -> { });
418 await().untilTrue(updated);
419 verify(mockListenerReg).close();
420 verify(mockScheduledFuture).cancel(anyBoolean());