Convert itm-impl to use mdsal-binding-util
[genius.git] / mdsalutil / mdsalutil-impl / src / test / java / org / opendaylight / genius / datastoreutils / listeners / tests / DataTreeEventCallbackRegistrarTest.java
1 /*
2  * Copyright (c) 2017 - 2018 Red Hat, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.datastoreutils.listeners.tests;
9
10 import static com.google.common.truth.Truth.assertThat;
11 import static org.awaitility.Awaitility.await;
12 import static org.mockito.ArgumentMatchers.any;
13 import static org.mockito.ArgumentMatchers.anyBoolean;
14 import static org.mockito.ArgumentMatchers.anyCollection;
15 import static org.mockito.ArgumentMatchers.anyLong;
16 import static org.mockito.ArgumentMatchers.eq;
17 import static org.mockito.Mockito.doAnswer;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.spy;
22 import static org.mockito.Mockito.verify;
23 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.TOP_FOO_KEY;
24 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.USES_ONE_KEY;
25 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.complexUsesAugment;
26 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.path;
27 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.topLevelList;
28 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
29
30 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
31 import com.google.common.util.concurrent.MoreExecutors;
32 import com.google.common.util.concurrent.Uninterruptibles;
33 import java.time.Duration;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.ScheduledFuture;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.function.Function;
40 import org.junit.Rule;
41 import org.junit.Test;
42 import org.mockito.ArgumentCaptor;
43 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
44 import org.opendaylight.genius.datastoreutils.listeners.DataTreeEventCallbackRegistrar;
45 import org.opendaylight.genius.datastoreutils.listeners.DataTreeEventCallbackRegistrar.NextAction;
46 import org.opendaylight.genius.datastoreutils.listeners.internal.DataTreeEventCallbackRegistrarImpl;
47 import org.opendaylight.infrautils.testutils.LogCaptureRule;
48 import org.opendaylight.infrautils.testutils.LogRule;
49 import org.opendaylight.mdsal.binding.api.DataBroker;
50 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
51 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
52 import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractConcurrentDataBrokerTest;
53 import org.opendaylight.mdsal.binding.util.Datastore;
54 import org.opendaylight.mdsal.binding.util.RetryingManagedNewTransactionRunner;
55 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
57 import org.opendaylight.yangtools.concepts.ListenerRegistration;
58 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 /**
63  * Test for {@link DataTreeEventCallbackRegistrarImpl}.
64  *
65  * @author Michael Vorburger.ch
66  */
67 public class DataTreeEventCallbackRegistrarTest {
68
69     // TODO add similar tests as for onAdd() also for onUpdate() and onDelete() and onAddOrUpdate()
70
71     private static final Logger LOG = LoggerFactory.getLogger(DataTreeEventCallbackRegistrarTest.class);
72
73     private static final InstanceIdentifier<TopLevelList> FOO_PATH = path(TOP_FOO_KEY);
74     private static final TopLevelList FOO_DATA = topLevelList(TOP_FOO_KEY, complexUsesAugment(USES_ONE_KEY));
75
76     public @Rule LogRule logRule = new LogRule();
77     public @Rule LogCaptureRule logCaptureRule = new LogCaptureRule();
78
79     private final DataBroker db;
80     private final SingleTransactionDataBroker db1;
81
82     public DataTreeEventCallbackRegistrarTest() throws Exception {
83         // Argument true to make sure we use the multi-threaded DataTreeChangeListenerExecutor
84         // because otherwise we hit a deadlock :( with this test!
85         /*ConstantSchemaAbstractDataBrokerTest dataBrokerTest = new ConstantSchemaAbstractDataBrokerTest(true) {
86             @Override
87             protected Set<YangModuleInfo> getModuleInfos() throws Exception {
88                 return ImmutableSet.of(BindingReflections.getModuleInfo(TwoLevelList.class),
89                         BindingReflections.getModuleInfo(TreeComplexUsesAugment.class));
90             }
91         };
92
93         dataBrokerTest.setup();
94         db = dataBrokerTest.getDataBroker();
95         db1 = new SingleTransactionDataBroker(db);*/
96         AbstractConcurrentDataBrokerTest dataBrokerTest =
97                 new AbstractConcurrentDataBrokerTest(true) {};
98         dataBrokerTest.setup();
99         db = dataBrokerTest.getDataBroker();
100         db1 = new SingleTransactionDataBroker(db);
101
102     }
103
104     @Test
105     public void testAddAndUnregister() throws TransactionCommitFailedException {
106         checkAdd(NextAction.UNREGISTER);
107     }
108
109     @Test
110     public void testAddAndKeepRegistered() throws TransactionCommitFailedException {
111         checkAdd(NextAction.CALL_AGAIN);
112     }
113
114     @Test
115     public void testAddOrUpdateAdd() throws TransactionCommitFailedException {
116         DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar = new DataTreeEventCallbackRegistrarImpl(db);
117         AtomicBoolean added = new AtomicBoolean(false);
118         dataTreeEventCallbackRegistrar.onAddOrUpdate(OPERATIONAL, FOO_PATH, (first, second) -> {
119             if (first == null && second != null) {
120                 added.set(true);
121             }
122             return NextAction.UNREGISTER;
123         });
124         db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
125         await().untilTrue(added);
126
127     }
128
129     @Test
130     public void testAddOrUpdateUpdate() throws TransactionCommitFailedException {
131         DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar = new DataTreeEventCallbackRegistrarImpl(db);
132         AtomicBoolean updated = new AtomicBoolean(false);
133         dataTreeEventCallbackRegistrar.onAddOrUpdate(OPERATIONAL, FOO_PATH, (first, second) -> {
134             if (first != null && second != null) {
135                 updated.set(true);
136                 return NextAction.UNREGISTER;
137             }
138             return NextAction.CALL_AGAIN;
139         });
140         db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
141         db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
142         await().untilTrue(updated);
143
144     }
145
146     private void checkAdd(NextAction nextAction) throws TransactionCommitFailedException {
147         DataBroker spiedDataBroker = spy(db);
148
149         ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
150         doAnswer(invocation -> {
151             ListenerRegistration<?> realReg = db.registerDataTreeChangeListener(
152                 invocation.getArgument(0),
153                 invocation.getArgument(1));
154             doAnswer(ignored -> {
155                 realReg.close();
156                 return null;
157             }).when(mockListenerReg).close();
158             return mockListenerReg;
159         }).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
160
161         DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar =
162                 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker);
163
164         AtomicBoolean added = new AtomicBoolean(false);
165         dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, topLevelList -> {
166             if (topLevelList.equals(FOO_DATA)) {
167                 added.set(true);
168             } else {
169                 LOG.error("Expected: {} but was: {}", FOO_DATA, topLevelList);
170                 assertThat(topLevelList).isEqualTo(FOO_DATA);
171             }
172             return nextAction;
173         });
174
175         db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
176         await().untilTrue(added);
177
178         if (nextAction.equals(NextAction.UNREGISTER)) {
179             verify(mockListenerReg).close();
180         } else {
181             added.set(false);
182             db1.syncDelete(OPERATIONAL, FOO_PATH);
183
184             db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
185             await().untilTrue(added);
186             verify(mockListenerReg, never()).close();
187         }
188     }
189
190     @Test
191     public void testAddWithTimeoutWhichExpires() throws InterruptedException {
192         DataBroker spiedDataBroker = spy(db);
193
194         ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
195         doReturn(mockListenerReg).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
196
197         DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar =
198                 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker);
199
200         AtomicBoolean timedOut = new AtomicBoolean(false);
201         dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, topLevelList -> { /* NOOP */ },
202                 Duration.ofMillis(50), iid -> {
203                 if (iid.equals(DataTreeIdentifier.create(OPERATIONAL, FOO_PATH))) {
204                     timedOut.set(true);
205                 }
206             }
207         );
208
209         Thread.sleep(75);
210         await().untilTrue(timedOut);
211         verify(mockListenerReg).close();
212     }
213
214     @Test
215     public void testAddWithTimeoutNeverHits() throws TransactionCommitFailedException, InterruptedException {
216         AtomicBoolean added = new AtomicBoolean(false);
217         AtomicBoolean timedOut = new AtomicBoolean(false);
218         DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar = new DataTreeEventCallbackRegistrarImpl(db);
219         dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, topLevelList -> {
220             added.set(true);
221         }, Duration.ofMillis(3000), iid -> timedOut.set(true));
222
223         // This test is timing sensitive, and a too low timeout value (above), or slow machine, could make this fail :(
224         db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
225         await().untilTrue(added);
226         await().untilFalse(timedOut);
227     }
228
229     @SuppressWarnings({ "unchecked", "rawtypes" })
230     @Test
231     public void testExceptionInCallbackMustBeLogged() throws TransactionCommitFailedException, InterruptedException {
232         logCaptureRule.expectLastErrorMessageContains("Error invoking worker");
233
234         DataBroker spiedDataBroker = spy(db);
235         final DataTreeChangeListener mockListener = mock(DataTreeChangeListener.class, "TestListener");
236         doAnswer(invocation -> db.registerDataTreeChangeListener(invocation.getArgument(0),
237                 mockListener)).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
238
239         AtomicBoolean added = new AtomicBoolean(false);
240         DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar =
241                 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker);
242         dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH,
243             (Function<TopLevelList, NextAction>) topLevelList -> {
244                 added.set(true);
245                 throw new IllegalStateException("TEST");
246             });
247
248         ArgumentCaptor<DataTreeChangeListener> realListener = ArgumentCaptor.forClass(DataTreeChangeListener.class);
249         verify(spiedDataBroker).registerDataTreeChangeListener(any(), realListener.capture());
250
251         AtomicBoolean onDataTreeChangeDone = new AtomicBoolean(false);
252         doAnswer(invocation -> {
253             try {
254                 realListener.getValue().onDataTreeChanged(invocation.getArgument(0));
255             } finally {
256                 onDataTreeChangeDone.set(true);
257             }
258             return null;
259         }).when(mockListener).onDataTreeChanged(anyCollection());
260
261         db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
262         await().untilTrue(added);
263         await().untilTrue(onDataTreeChangeDone);
264     }
265
266     @Test
267     public void testTimeoutCallbackNotInvokedWhileHandlingChangeNotificationForUnregister() {
268         testTimeoutCallbackNotInvokedWhileHandlingChangeNotification(NextAction.UNREGISTER);
269     }
270
271     @Test
272     public void testTimeoutCallbackIsInvokedWhileHandlingChangeNotificationForCallAgain() {
273         testTimeoutCallbackNotInvokedWhileHandlingChangeNotification(NextAction.CALL_AGAIN);
274     }
275
276     private void testTimeoutCallbackNotInvokedWhileHandlingChangeNotification(NextAction nextAction) {
277         Duration timeout = Duration.ofMillis(10);
278
279         ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
280         ScheduledFuture<?> mockScheduledFuture = mock(ScheduledFuture.class);
281         doReturn(mockScheduledFuture).when(mockScheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
282
283         ListeningScheduledExecutorService directExecutorService = MoreExecutors.listeningDecorator(mockScheduler);
284
285         DataTreeEventCallbackRegistrar callbackRegistrar =
286                 new DataTreeEventCallbackRegistrarImpl(db, directExecutorService);
287
288         CountDownLatch inChangeCallback = new CountDownLatch(1);
289         CountDownLatch changeCallbackContinue = new CountDownLatch(1);
290         AtomicBoolean updated = new AtomicBoolean(false);
291         AtomicBoolean timedOut = new AtomicBoolean(false);
292         callbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, dataObject -> {
293             inChangeCallback.countDown();
294             Uninterruptibles.awaitUninterruptibly(changeCallbackContinue);
295
296             // Sleep a bit for the timeout task - see below.
297             Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
298             updated.set(true);
299             return nextAction;
300         }, timeout, id -> timedOut.set(true));
301
302         ArgumentCaptor<Runnable> timerTask = ArgumentCaptor.forClass(Runnable.class);
303         verify(mockScheduler).schedule(timerTask.capture(), eq(timeout.toMillis()), eq(TimeUnit.MILLISECONDS));
304
305         new RetryingManagedNewTransactionRunner(db, 1).callWithNewWriteOnlyTransactionAndSubmit(Datastore.OPERATIONAL,
306             tx -> tx.mergeParentStructurePut(FOO_PATH, FOO_DATA));
307
308         // Wait for the change notification callback to be invoked.
309
310         assertThat(Uninterruptibles.awaitUninterruptibly(inChangeCallback, 5, TimeUnit.SECONDS)).isTrue();
311
312         // Now artificially fire the timeout task on a separate thread.
313
314         CountDownLatch timerTaskDone = new CountDownLatch(1);
315         new Thread(() -> {
316             // We have to tell the notification change callback to continue prior to invoking the timeout task as
317             // the latter should block internally in DataTreeEventCallbackRegistrarImpl while the change notification
318             // is still in progress. The change callback sleeps a bit to give the timeout task plenty of time to
319             // complete if it didn't block.
320             changeCallbackContinue.countDown();
321             timerTask.getValue().run();
322             timerTaskDone.countDown();
323         }).start();
324
325         await().timeout(5, TimeUnit.SECONDS).untilTrue(updated);
326
327         assertThat(Uninterruptibles.awaitUninterruptibly(timerTaskDone, 5, TimeUnit.SECONDS)).isTrue();
328
329         if (nextAction.equals(NextAction.UNREGISTER)) {
330             assertThat(timedOut.get()).isFalse();
331             verify(mockScheduledFuture).cancel(anyBoolean());
332         } else {
333             assertThat(timedOut.get()).isTrue();
334             verify(mockScheduledFuture, never()).cancel(anyBoolean());
335         }
336     }
337
338     @SuppressWarnings({ "rawtypes", "unchecked" })
339     @Test
340     public void testChangeCallbackNotInvokedAfterTimeout() {
341         Duration timeout = Duration.ofMillis(10);
342
343         ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
344         ScheduledFuture<?> mockScheduledFuture = mock(ScheduledFuture.class);
345         doReturn(mockScheduledFuture).when(mockScheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
346
347         ListeningScheduledExecutorService directExecutorService = MoreExecutors.listeningDecorator(mockScheduler);
348
349         DataBroker spiedDataBroker = spy(db);
350
351         final DataTreeChangeListener mockListener = mock(DataTreeChangeListener.class);
352         doAnswer(invocation -> {
353             db.registerDataTreeChangeListener(invocation.getArgument(0), mockListener);
354             return mock(ListenerRegistration.class);
355         }).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
356
357         DataTreeEventCallbackRegistrar callbackRegistrar =
358                 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker, directExecutorService);
359
360         AtomicBoolean updated = new AtomicBoolean(false);
361         AtomicBoolean timedOut = new AtomicBoolean(false);
362         callbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, dataObject -> {
363             updated.set(true);
364             return NextAction.UNREGISTER;
365         }, timeout, id -> timedOut.set(true));
366
367         ArgumentCaptor<Runnable> timerTask = ArgumentCaptor.forClass(Runnable.class);
368         verify(mockScheduler).schedule(timerTask.capture(), eq(timeout.toMillis()), eq(TimeUnit.MILLISECONDS));
369
370         ArgumentCaptor<DataTreeChangeListener> realListener = ArgumentCaptor.forClass(DataTreeChangeListener.class);
371         verify(spiedDataBroker).registerDataTreeChangeListener(any(), realListener.capture());
372
373         timerTask.getValue().run();
374         assertThat(timedOut.get()).isTrue();
375
376         AtomicBoolean onDataTreeChangeDone = new AtomicBoolean(false);
377         doAnswer(invocation -> {
378             try {
379                 realListener.getValue().onDataTreeChanged(invocation.getArgument(0));
380             } finally {
381                 onDataTreeChangeDone.set(true);
382             }
383             return null;
384         }).when(mockListener).onDataTreeChanged(anyCollection());
385
386         new RetryingManagedNewTransactionRunner(db, 1).callWithNewWriteOnlyTransactionAndSubmit(Datastore.OPERATIONAL,
387             tx -> tx.mergeParentStructurePut(FOO_PATH, FOO_DATA));
388
389         await().untilTrue(onDataTreeChangeDone);
390         assertThat(updated.get()).isFalse();
391     }
392
393     @Test
394     public void testChangeCallbackOccursImmediatelyAfterRegistration() {
395         ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
396         ScheduledFuture<?> mockScheduledFuture = mock(ScheduledFuture.class);
397         doReturn(mockScheduledFuture).when(mockScheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
398
399         DataBroker spiedDataBroker = spy(db);
400
401         AtomicBoolean updated = new AtomicBoolean(false);
402         ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
403         doAnswer(invocation -> {
404             DataTreeChangeListener<?> listener = invocation.getArgument(1);
405             db.registerDataTreeChangeListener(invocation.getArgument(0), listener);
406             db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
407             await().untilTrue(updated);
408             return mockListenerReg;
409         }).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
410
411         DataTreeEventCallbackRegistrar callbackRegistrar =
412                 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker, mockScheduler);
413
414         callbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, dataObject -> {
415             updated.set(true);
416             return NextAction.UNREGISTER;
417         }, Duration.ofMillis(10), id -> { });
418
419         await().untilTrue(updated);
420         verify(mockListenerReg).close();
421         verify(mockScheduledFuture).cancel(anyBoolean());
422     }
423 }