71949af4b1d14b599d59d76456a4d720432dfc09
[genius.git] / mdsalutil / mdsalutil-impl / src / test / java / org / opendaylight / genius / datastoreutils / listeners / tests / DataTreeEventCallbackRegistrarTest.java
1 /*
2  * Copyright (c) 2017 - 2018 Red Hat, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.datastoreutils.listeners.tests;
9
10 import static com.google.common.truth.Truth.assertThat;
11 import static org.awaitility.Awaitility.await;
12 import static org.mockito.ArgumentMatchers.any;
13 import static org.mockito.ArgumentMatchers.anyBoolean;
14 import static org.mockito.ArgumentMatchers.anyCollection;
15 import static org.mockito.ArgumentMatchers.anyLong;
16 import static org.mockito.ArgumentMatchers.eq;
17 import static org.mockito.Mockito.doAnswer;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.spy;
22 import static org.mockito.Mockito.verify;
23 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.TOP_FOO_KEY;
24 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.USES_ONE_KEY;
25 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.complexUsesAugment;
26 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.path;
27 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.topLevelList;
28 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
29
30 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
31 import com.google.common.util.concurrent.MoreExecutors;
32 import com.google.common.util.concurrent.Uninterruptibles;
33 import java.time.Duration;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.ScheduledFuture;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.function.Function;
40 import org.junit.Rule;
41 import org.junit.Test;
42 import org.mockito.ArgumentCaptor;
43 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
44 import org.opendaylight.genius.datastoreutils.listeners.DataTreeEventCallbackRegistrar;
45 import org.opendaylight.genius.datastoreutils.listeners.DataTreeEventCallbackRegistrar.NextAction;
46 import org.opendaylight.genius.datastoreutils.listeners.internal.DataTreeEventCallbackRegistrarImpl;
47 import org.opendaylight.genius.infra.RetryingManagedNewTransactionRunner;
48 import org.opendaylight.infrautils.testutils.LogCaptureRule;
49 import org.opendaylight.infrautils.testutils.LogRule;
50 import org.opendaylight.mdsal.binding.api.DataBroker;
51 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
52 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
53 import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractConcurrentDataBrokerTest;
54 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
56 import org.opendaylight.yangtools.concepts.ListenerRegistration;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 /**
62  * Test for {@link DataTreeEventCallbackRegistrarImpl}.
63  *
64  * @author Michael Vorburger.ch
65  */
66 public class DataTreeEventCallbackRegistrarTest {
67
68     // TODO add similar tests as for onAdd() also for onUpdate() and onDelete() and onAddOrUpdate()
69
70     private static final Logger LOG = LoggerFactory.getLogger(DataTreeEventCallbackRegistrarTest.class);
71
72     private static final InstanceIdentifier<TopLevelList> FOO_PATH = path(TOP_FOO_KEY);
73     private static final TopLevelList FOO_DATA = topLevelList(TOP_FOO_KEY, complexUsesAugment(USES_ONE_KEY));
74
75     public @Rule LogRule logRule = new LogRule();
76     public @Rule LogCaptureRule logCaptureRule = new LogCaptureRule();
77
78     private final DataBroker db;
79     private final SingleTransactionDataBroker db1;
80
81     public DataTreeEventCallbackRegistrarTest() throws Exception {
82         // Argument true to make sure we use the multi-threaded DataTreeChangeListenerExecutor
83         // because otherwise we hit a deadlock :( with this test!
84         /*ConstantSchemaAbstractDataBrokerTest dataBrokerTest = new ConstantSchemaAbstractDataBrokerTest(true) {
85             @Override
86             protected Set<YangModuleInfo> getModuleInfos() throws Exception {
87                 return ImmutableSet.of(BindingReflections.getModuleInfo(TwoLevelList.class),
88                         BindingReflections.getModuleInfo(TreeComplexUsesAugment.class));
89             }
90         };
91
92         dataBrokerTest.setup();
93         db = dataBrokerTest.getDataBroker();
94         db1 = new SingleTransactionDataBroker(db);*/
95         AbstractConcurrentDataBrokerTest dataBrokerTest =
96                 new AbstractConcurrentDataBrokerTest(true) {};
97         dataBrokerTest.setup();
98         db = dataBrokerTest.getDataBroker();
99         db1 = new SingleTransactionDataBroker(db);
100
101     }
102
103     @Test
104     public void testAddAndUnregister() throws TransactionCommitFailedException {
105         checkAdd(NextAction.UNREGISTER);
106     }
107
108     @Test
109     public void testAddAndKeepRegistered() throws TransactionCommitFailedException {
110         checkAdd(NextAction.CALL_AGAIN);
111     }
112
113     @Test
114     public void testAddOrUpdateAdd() throws TransactionCommitFailedException {
115         DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar = new DataTreeEventCallbackRegistrarImpl(db);
116         AtomicBoolean added = new AtomicBoolean(false);
117         dataTreeEventCallbackRegistrar.onAddOrUpdate(OPERATIONAL, FOO_PATH, (first, second) -> {
118             if (first == null && second != null) {
119                 added.set(true);
120             }
121             return NextAction.UNREGISTER;
122         });
123         db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
124         await().untilTrue(added);
125
126     }
127
128     @Test
129     public void testAddOrUpdateUpdate() throws TransactionCommitFailedException {
130         DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar = new DataTreeEventCallbackRegistrarImpl(db);
131         AtomicBoolean updated = new AtomicBoolean(false);
132         dataTreeEventCallbackRegistrar.onAddOrUpdate(OPERATIONAL, FOO_PATH, (first, second) -> {
133             if (first != null && second != null) {
134                 updated.set(true);
135                 return NextAction.UNREGISTER;
136             }
137             return NextAction.CALL_AGAIN;
138         });
139         db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
140         db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
141         await().untilTrue(updated);
142
143     }
144
145     private void checkAdd(NextAction nextAction) throws TransactionCommitFailedException {
146         DataBroker spiedDataBroker = spy(db);
147
148         ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
149         doAnswer(invocation -> {
150             ListenerRegistration<?> realReg = db.registerDataTreeChangeListener(
151                 invocation.getArgument(0),
152                 invocation.getArgument(1));
153             doAnswer(ignored -> {
154                 realReg.close();
155                 return null;
156             }).when(mockListenerReg).close();
157             return mockListenerReg;
158         }).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
159
160         DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar =
161                 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker);
162
163         AtomicBoolean added = new AtomicBoolean(false);
164         dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, topLevelList -> {
165             if (topLevelList.equals(FOO_DATA)) {
166                 added.set(true);
167             } else {
168                 LOG.error("Expected: {} but was: {}", FOO_DATA, topLevelList);
169                 assertThat(topLevelList).isEqualTo(FOO_DATA);
170             }
171             return nextAction;
172         });
173
174         db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
175         await().untilTrue(added);
176
177         if (nextAction.equals(NextAction.UNREGISTER)) {
178             verify(mockListenerReg).close();
179         } else {
180             added.set(false);
181             db1.syncDelete(OPERATIONAL, FOO_PATH);
182
183             db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
184             await().untilTrue(added);
185             verify(mockListenerReg, never()).close();
186         }
187     }
188
189     @Test
190     public void testAddWithTimeoutWhichExpires() throws InterruptedException {
191         DataBroker spiedDataBroker = spy(db);
192
193         ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
194         doReturn(mockListenerReg).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
195
196         DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar =
197                 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker);
198
199         AtomicBoolean timedOut = new AtomicBoolean(false);
200         dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, topLevelList -> { /* NOOP */ },
201                 Duration.ofMillis(50), iid -> {
202                 if (iid.equals(DataTreeIdentifier.create(OPERATIONAL, FOO_PATH))) {
203                     timedOut.set(true);
204                 }
205             }
206         );
207
208         Thread.sleep(75);
209         await().untilTrue(timedOut);
210         verify(mockListenerReg).close();
211     }
212
213     @Test
214     public void testAddWithTimeoutNeverHits() throws TransactionCommitFailedException, InterruptedException {
215         AtomicBoolean added = new AtomicBoolean(false);
216         AtomicBoolean timedOut = new AtomicBoolean(false);
217         DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar = new DataTreeEventCallbackRegistrarImpl(db);
218         dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, topLevelList -> {
219             added.set(true);
220         }, Duration.ofMillis(3000), iid -> timedOut.set(true));
221
222         // This test is timing sensitive, and a too low timeout value (above), or slow machine, could make this fail :(
223         db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
224         await().untilTrue(added);
225         await().untilFalse(timedOut);
226     }
227
228     @SuppressWarnings({ "unchecked", "rawtypes" })
229     @Test
230     public void testExceptionInCallbackMustBeLogged() throws TransactionCommitFailedException, InterruptedException {
231         logCaptureRule.expectLastErrorMessageContains("Error invoking worker");
232
233         DataBroker spiedDataBroker = spy(db);
234         final DataTreeChangeListener mockListener = mock(DataTreeChangeListener.class, "TestListener");
235         doAnswer(invocation -> db.registerDataTreeChangeListener(invocation.getArgument(0),
236                 mockListener)).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
237
238         AtomicBoolean added = new AtomicBoolean(false);
239         DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar =
240                 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker);
241         dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH,
242             (Function<TopLevelList, NextAction>) topLevelList -> {
243                 added.set(true);
244                 throw new IllegalStateException("TEST");
245             });
246
247         ArgumentCaptor<DataTreeChangeListener> realListener = ArgumentCaptor.forClass(DataTreeChangeListener.class);
248         verify(spiedDataBroker).registerDataTreeChangeListener(any(), realListener.capture());
249
250         AtomicBoolean onDataTreeChangeDone = new AtomicBoolean(false);
251         doAnswer(invocation -> {
252             try {
253                 realListener.getValue().onDataTreeChanged(invocation.getArgument(0));
254             } finally {
255                 onDataTreeChangeDone.set(true);
256             }
257             return null;
258         }).when(mockListener).onDataTreeChanged(anyCollection());
259
260         db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
261         await().untilTrue(added);
262         await().untilTrue(onDataTreeChangeDone);
263     }
264
265     @Test
266     public void testTimeoutCallbackNotInvokedWhileHandlingChangeNotificationForUnregister() {
267         testTimeoutCallbackNotInvokedWhileHandlingChangeNotification(NextAction.UNREGISTER);
268     }
269
270     @Test
271     public void testTimeoutCallbackIsInvokedWhileHandlingChangeNotificationForCallAgain() {
272         testTimeoutCallbackNotInvokedWhileHandlingChangeNotification(NextAction.CALL_AGAIN);
273     }
274
275     private void testTimeoutCallbackNotInvokedWhileHandlingChangeNotification(NextAction nextAction) {
276         Duration timeout = Duration.ofMillis(10);
277
278         ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
279         ScheduledFuture<?> mockScheduledFuture = mock(ScheduledFuture.class);
280         doReturn(mockScheduledFuture).when(mockScheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
281
282         ListeningScheduledExecutorService directExecutorService = MoreExecutors.listeningDecorator(mockScheduler);
283
284         DataTreeEventCallbackRegistrar callbackRegistrar =
285                 new DataTreeEventCallbackRegistrarImpl(db, directExecutorService);
286
287         CountDownLatch inChangeCallback = new CountDownLatch(1);
288         CountDownLatch changeCallbackContinue = new CountDownLatch(1);
289         AtomicBoolean updated = new AtomicBoolean(false);
290         AtomicBoolean timedOut = new AtomicBoolean(false);
291         callbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, dataObject -> {
292             inChangeCallback.countDown();
293             Uninterruptibles.awaitUninterruptibly(changeCallbackContinue);
294
295             // Sleep a bit for the timeout task - see below.
296             Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
297             updated.set(true);
298             return nextAction;
299         }, timeout, id -> timedOut.set(true));
300
301         ArgumentCaptor<Runnable> timerTask = ArgumentCaptor.forClass(Runnable.class);
302         verify(mockScheduler).schedule(timerTask.capture(), eq(timeout.toMillis()), eq(TimeUnit.MILLISECONDS));
303
304         new RetryingManagedNewTransactionRunner(db, 1).callWithNewWriteOnlyTransactionAndSubmit(
305             tx -> tx.mergeParentStructurePut(OPERATIONAL, FOO_PATH, FOO_DATA));
306
307         // Wait for the change notification callback to be invoked.
308
309         assertThat(Uninterruptibles.awaitUninterruptibly(inChangeCallback, 5, TimeUnit.SECONDS)).isTrue();
310
311         // Now artificially fire the timeout task on a separate thread.
312
313         CountDownLatch timerTaskDone = new CountDownLatch(1);
314         new Thread(() -> {
315             // We have to tell the notification change callback to continue prior to invoking the timeout task as
316             // the latter should block internally in DataTreeEventCallbackRegistrarImpl while the change notification
317             // is still in progress. The change callback sleeps a bit to give the timeout task plenty of time to
318             // complete if it didn't block.
319             changeCallbackContinue.countDown();
320             timerTask.getValue().run();
321             timerTaskDone.countDown();
322         }).start();
323
324         await().timeout(5, TimeUnit.SECONDS).untilTrue(updated);
325
326         assertThat(Uninterruptibles.awaitUninterruptibly(timerTaskDone, 5, TimeUnit.SECONDS)).isTrue();
327
328         if (nextAction.equals(NextAction.UNREGISTER)) {
329             assertThat(timedOut.get()).isFalse();
330             verify(mockScheduledFuture).cancel(anyBoolean());
331         } else {
332             assertThat(timedOut.get()).isTrue();
333             verify(mockScheduledFuture, never()).cancel(anyBoolean());
334         }
335     }
336
337     @SuppressWarnings({ "rawtypes", "unchecked" })
338     @Test
339     public void testChangeCallbackNotInvokedAfterTimeout() {
340         Duration timeout = Duration.ofMillis(10);
341
342         ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
343         ScheduledFuture<?> mockScheduledFuture = mock(ScheduledFuture.class);
344         doReturn(mockScheduledFuture).when(mockScheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
345
346         ListeningScheduledExecutorService directExecutorService = MoreExecutors.listeningDecorator(mockScheduler);
347
348         DataBroker spiedDataBroker = spy(db);
349
350         final DataTreeChangeListener mockListener = mock(DataTreeChangeListener.class);
351         doAnswer(invocation -> {
352             db.registerDataTreeChangeListener(invocation.getArgument(0), mockListener);
353             return mock(ListenerRegistration.class);
354         }).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
355
356         DataTreeEventCallbackRegistrar callbackRegistrar =
357                 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker, directExecutorService);
358
359         AtomicBoolean updated = new AtomicBoolean(false);
360         AtomicBoolean timedOut = new AtomicBoolean(false);
361         callbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, dataObject -> {
362             updated.set(true);
363             return NextAction.UNREGISTER;
364         }, timeout, id -> timedOut.set(true));
365
366         ArgumentCaptor<Runnable> timerTask = ArgumentCaptor.forClass(Runnable.class);
367         verify(mockScheduler).schedule(timerTask.capture(), eq(timeout.toMillis()), eq(TimeUnit.MILLISECONDS));
368
369         ArgumentCaptor<DataTreeChangeListener> realListener = ArgumentCaptor.forClass(DataTreeChangeListener.class);
370         verify(spiedDataBroker).registerDataTreeChangeListener(any(), realListener.capture());
371
372         timerTask.getValue().run();
373         assertThat(timedOut.get()).isTrue();
374
375         AtomicBoolean onDataTreeChangeDone = new AtomicBoolean(false);
376         doAnswer(invocation -> {
377             try {
378                 realListener.getValue().onDataTreeChanged(invocation.getArgument(0));
379             } finally {
380                 onDataTreeChangeDone.set(true);
381             }
382             return null;
383         }).when(mockListener).onDataTreeChanged(anyCollection());
384
385         new RetryingManagedNewTransactionRunner(db, 1).callWithNewWriteOnlyTransactionAndSubmit(
386             tx -> tx.mergeParentStructurePut(OPERATIONAL, FOO_PATH, FOO_DATA));
387
388         await().untilTrue(onDataTreeChangeDone);
389         assertThat(updated.get()).isFalse();
390     }
391
392     @Test
393     public void testChangeCallbackOccursImmediatelyAfterRegistration() {
394         ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
395         ScheduledFuture<?> mockScheduledFuture = mock(ScheduledFuture.class);
396         doReturn(mockScheduledFuture).when(mockScheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
397
398         DataBroker spiedDataBroker = spy(db);
399
400         AtomicBoolean updated = new AtomicBoolean(false);
401         ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
402         doAnswer(invocation -> {
403             DataTreeChangeListener<?> listener = invocation.getArgument(1);
404             db.registerDataTreeChangeListener(invocation.getArgument(0), listener);
405             db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
406             await().untilTrue(updated);
407             return mockListenerReg;
408         }).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
409
410         DataTreeEventCallbackRegistrar callbackRegistrar =
411                 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker, mockScheduler);
412
413         callbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, dataObject -> {
414             updated.set(true);
415             return NextAction.UNREGISTER;
416         }, Duration.ofMillis(10), id -> { });
417
418         await().untilTrue(updated);
419         verify(mockListenerReg).close();
420         verify(mockScheduledFuture).cancel(anyBoolean());
421     }
422 }