MRI version bump for Aluminium
[genius.git] / mdsalutil / mdsalutil-impl / src / test / java / org / opendaylight / genius / datastoreutils / listeners / tests / DataTreeEventCallbackRegistrarTest.java
1 /*
2  * Copyright (c) 2017 - 2018 Red Hat, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.datastoreutils.listeners.tests;
9
10 import static com.google.common.truth.Truth.assertThat;
11 import static org.awaitility.Awaitility.await;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Matchers.anyBoolean;
14 import static org.mockito.Matchers.anyCollection;
15 import static org.mockito.Matchers.anyLong;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Mockito.doAnswer;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.spy;
22 import static org.mockito.Mockito.verify;
23 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.TOP_FOO_KEY;
24 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.USES_ONE_KEY;
25 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.complexUsesAugment;
26 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.path;
27 import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.topLevelList;
28 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
29
30 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
31 import com.google.common.util.concurrent.MoreExecutors;
32 import com.google.common.util.concurrent.Uninterruptibles;
33 import java.time.Duration;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.ScheduledFuture;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.function.Function;
40 import org.junit.Rule;
41 import org.junit.Test;
42 import org.mockito.ArgumentCaptor;
43 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
44 import org.opendaylight.genius.datastoreutils.listeners.DataTreeEventCallbackRegistrar;
45 import org.opendaylight.genius.datastoreutils.listeners.DataTreeEventCallbackRegistrar.NextAction;
46 import org.opendaylight.genius.datastoreutils.listeners.internal.DataTreeEventCallbackRegistrarImpl;
47 import org.opendaylight.genius.infra.RetryingManagedNewTransactionRunner;
48 import org.opendaylight.infrautils.testutils.LogCaptureRule;
49 import org.opendaylight.infrautils.testutils.LogRule;
50 import org.opendaylight.mdsal.binding.api.DataBroker;
51 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
52 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
53 import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractConcurrentDataBrokerTest;
54 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
56 import org.opendaylight.yangtools.concepts.ListenerRegistration;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 /**
62  * Test for {@link DataTreeEventCallbackRegistrarImpl}.
63  *
64  * @author Michael Vorburger.ch
65  */
66 public class DataTreeEventCallbackRegistrarTest {
67
68     // TODO add similar tests as for onAdd() also for onUpdate() and onDelete() and onAddOrUpdate()
69
70     private static final Logger LOG = LoggerFactory.getLogger(DataTreeEventCallbackRegistrarTest.class);
71
72     private static final InstanceIdentifier<TopLevelList> FOO_PATH = path(TOP_FOO_KEY);
73     private static final TopLevelList FOO_DATA = topLevelList(TOP_FOO_KEY, complexUsesAugment(USES_ONE_KEY));
74
75     public @Rule LogRule logRule = new LogRule();
76     public @Rule LogCaptureRule logCaptureRule = new LogCaptureRule();
77
78     private final DataBroker db;
79     private final SingleTransactionDataBroker db1;
80
81     public DataTreeEventCallbackRegistrarTest() throws Exception {
82         // Argument true to make sure we use the multi-threaded DataTreeChangeListenerExecutor
83         // because otherwise we hit a deadlock :( with this test!
84         /*ConstantSchemaAbstractDataBrokerTest dataBrokerTest = new ConstantSchemaAbstractDataBrokerTest(true) {
85             @Override
86             protected Set<YangModuleInfo> getModuleInfos() throws Exception {
87                 return ImmutableSet.of(BindingReflections.getModuleInfo(TwoLevelList.class),
88                         BindingReflections.getModuleInfo(TreeComplexUsesAugment.class));
89             }
90         };
91
92         dataBrokerTest.setup();
93         db = dataBrokerTest.getDataBroker();
94         db1 = new SingleTransactionDataBroker(db);*/
95         AbstractConcurrentDataBrokerTest dataBrokerTest =
96                 new AbstractConcurrentDataBrokerTest(true) {};
97         dataBrokerTest.setup();
98         db = dataBrokerTest.getDataBroker();
99         db1 = new SingleTransactionDataBroker(db);
100
101     }
102
103     @Test
104     public void testAddAndUnregister() throws TransactionCommitFailedException {
105         checkAdd(NextAction.UNREGISTER);
106     }
107
108     @Test
109     public void testAddAndKeepRegistered() throws TransactionCommitFailedException {
110         checkAdd(NextAction.CALL_AGAIN);
111     }
112
113     @Test
114     public void testAddOrUpdateAdd() throws TransactionCommitFailedException {
115         DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar = new DataTreeEventCallbackRegistrarImpl(db);
116         AtomicBoolean added = new AtomicBoolean(false);
117         dataTreeEventCallbackRegistrar.onAddOrUpdate(OPERATIONAL, FOO_PATH, (first, second) -> {
118             if (first == null && second != null) {
119                 added.set(true);
120             }
121             return NextAction.UNREGISTER;
122         });
123         db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
124         await().untilTrue(added);
125
126     }
127
128     @Test
129     public void testAddOrUpdateUpdate() throws TransactionCommitFailedException {
130         DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar = new DataTreeEventCallbackRegistrarImpl(db);
131         AtomicBoolean updated = new AtomicBoolean(false);
132         dataTreeEventCallbackRegistrar.onAddOrUpdate(OPERATIONAL, FOO_PATH, (first, second) -> {
133             if (first != null && second != null) {
134                 updated.set(true);
135                 return NextAction.UNREGISTER;
136             }
137             return NextAction.CALL_AGAIN;
138         });
139         db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
140         db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
141         await().untilTrue(updated);
142
143     }
144
145     @SuppressWarnings("unchecked")
146     private void checkAdd(NextAction nextAction) throws TransactionCommitFailedException {
147         DataBroker spiedDataBroker = spy(db);
148
149         ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
150         doAnswer(invocation -> {
151             ListenerRegistration<?> realReg = db.registerDataTreeChangeListener(
152                 invocation.getArgument(0),
153                 invocation.getArgument(1));
154             doAnswer(ignored -> {
155                 realReg.close();
156                 return null;
157             }).when(mockListenerReg).close();
158             return mockListenerReg;
159         }).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
160
161         DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar =
162                 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker);
163
164         AtomicBoolean added = new AtomicBoolean(false);
165         dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, topLevelList -> {
166             if (topLevelList.equals(FOO_DATA)) {
167                 added.set(true);
168             } else {
169                 LOG.error("Expected: {} but was: {}", FOO_DATA, topLevelList);
170                 assertThat(topLevelList).isEqualTo(FOO_DATA);
171             }
172             return nextAction;
173         });
174
175         db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
176         await().untilTrue(added);
177
178         if (nextAction.equals(NextAction.UNREGISTER)) {
179             verify(mockListenerReg).close();
180         } else {
181             added.set(false);
182             db1.syncDelete(OPERATIONAL, FOO_PATH);
183
184             db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
185             await().untilTrue(added);
186             verify(mockListenerReg, never()).close();
187         }
188     }
189
190     @Test
191     public void testAddWithTimeoutWhichExpires() throws InterruptedException {
192         DataBroker spiedDataBroker = spy(db);
193
194         ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
195         doReturn(mockListenerReg).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
196
197         DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar =
198                 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker);
199
200         AtomicBoolean timedOut = new AtomicBoolean(false);
201         dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, topLevelList -> { /* NOOP */ },
202                 Duration.ofMillis(50), iid -> {
203                 if (iid.equals(DataTreeIdentifier.create(OPERATIONAL, FOO_PATH))) {
204                     timedOut.set(true);
205                 }
206             }
207         );
208
209         Thread.sleep(75);
210         await().untilTrue(timedOut);
211         verify(mockListenerReg).close();
212     }
213
214     @Test
215     public void testAddWithTimeoutNeverHits() throws TransactionCommitFailedException, InterruptedException {
216         AtomicBoolean added = new AtomicBoolean(false);
217         AtomicBoolean timedOut = new AtomicBoolean(false);
218         DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar = new DataTreeEventCallbackRegistrarImpl(db);
219         dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, topLevelList -> {
220             added.set(true);
221         }, Duration.ofMillis(3000), iid -> timedOut.set(true));
222
223         // This test is timing sensitive, and a too low timeout value (above), or slow machine, could make this fail :(
224         db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
225         await().untilTrue(added);
226         await().untilFalse(timedOut);
227     }
228
229     @SuppressWarnings({ "unchecked", "rawtypes" })
230     @Test
231     public void testExceptionInCallbackMustBeLogged() throws TransactionCommitFailedException, InterruptedException {
232         logCaptureRule.expectLastErrorMessageContains("Error invoking worker");
233
234         DataBroker spiedDataBroker = spy(db);
235         final DataTreeChangeListener mockListener = mock(DataTreeChangeListener.class, "TestListener");
236         doAnswer(invocation -> db.registerDataTreeChangeListener(invocation.getArgument(0),
237                 mockListener)).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
238
239         AtomicBoolean added = new AtomicBoolean(false);
240         DataTreeEventCallbackRegistrar dataTreeEventCallbackRegistrar =
241                 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker);
242         dataTreeEventCallbackRegistrar.onAdd(OPERATIONAL, FOO_PATH,
243             (Function<TopLevelList, NextAction>) topLevelList -> {
244                 added.set(true);
245                 throw new IllegalStateException("TEST");
246             });
247
248         ArgumentCaptor<DataTreeChangeListener> realListener = ArgumentCaptor.forClass(DataTreeChangeListener.class);
249         verify(spiedDataBroker).registerDataTreeChangeListener(any(), realListener.capture());
250
251         AtomicBoolean onDataTreeChangeDone = new AtomicBoolean(false);
252         doAnswer(invocation -> {
253             try {
254                 realListener.getValue().onDataTreeChanged(invocation.getArgument(0));
255             } finally {
256                 onDataTreeChangeDone.set(true);
257             }
258             return null;
259         }).when(mockListener).onDataTreeChanged(anyCollection());
260
261         db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
262         await().untilTrue(added);
263         await().untilTrue(onDataTreeChangeDone);
264     }
265
266     @Test
267     public void testTimeoutCallbackNotInvokedWhileHandlingChangeNotificationForUnregister() {
268         testTimeoutCallbackNotInvokedWhileHandlingChangeNotification(NextAction.UNREGISTER);
269     }
270
271     @Test
272     public void testTimeoutCallbackIsInvokedWhileHandlingChangeNotificationForCallAgain() {
273         testTimeoutCallbackNotInvokedWhileHandlingChangeNotification(NextAction.CALL_AGAIN);
274     }
275
276     private void testTimeoutCallbackNotInvokedWhileHandlingChangeNotification(NextAction nextAction) {
277         Duration timeout = Duration.ofMillis(10);
278
279         ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
280         ScheduledFuture<?> mockScheduledFuture = mock(ScheduledFuture.class);
281         doReturn(mockScheduledFuture).when(mockScheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
282
283         ListeningScheduledExecutorService directExecutorService = MoreExecutors.listeningDecorator(mockScheduler);
284
285         DataTreeEventCallbackRegistrar callbackRegistrar =
286                 new DataTreeEventCallbackRegistrarImpl(db, directExecutorService);
287
288         CountDownLatch inChangeCallback = new CountDownLatch(1);
289         CountDownLatch changeCallbackContinue = new CountDownLatch(1);
290         AtomicBoolean updated = new AtomicBoolean(false);
291         AtomicBoolean timedOut = new AtomicBoolean(false);
292         callbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, dataObject -> {
293             inChangeCallback.countDown();
294             Uninterruptibles.awaitUninterruptibly(changeCallbackContinue);
295
296             // Sleep a bit for the timeout task - see below.
297             Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
298             updated.set(true);
299             return nextAction;
300         }, timeout, id -> timedOut.set(true));
301
302         ArgumentCaptor<Runnable> timerTask = ArgumentCaptor.forClass(Runnable.class);
303         verify(mockScheduler).schedule(timerTask.capture(), eq(timeout.toMillis()), eq(TimeUnit.MILLISECONDS));
304
305         new RetryingManagedNewTransactionRunner(db, 1).callWithNewWriteOnlyTransactionAndSubmit(
306             tx -> tx.mergeParentStructurePut(OPERATIONAL, FOO_PATH, FOO_DATA));
307
308         // Wait for the change notification callback to be invoked.
309
310         assertThat(Uninterruptibles.awaitUninterruptibly(inChangeCallback, 5, TimeUnit.SECONDS)).isTrue();
311
312         // Now artificially fire the timeout task on a separate thread.
313
314         CountDownLatch timerTaskDone = new CountDownLatch(1);
315         new Thread(() -> {
316             // We have to tell the notification change callback to continue prior to invoking the timeout task as
317             // the latter should block internally in DataTreeEventCallbackRegistrarImpl while the change notification
318             // is still in progress. The change callback sleeps a bit to give the timeout task plenty of time to
319             // complete if it didn't block.
320             changeCallbackContinue.countDown();
321             timerTask.getValue().run();
322             timerTaskDone.countDown();
323         }).start();
324
325         await().timeout(5, TimeUnit.SECONDS).untilTrue(updated);
326
327         assertThat(Uninterruptibles.awaitUninterruptibly(timerTaskDone, 5, TimeUnit.SECONDS)).isTrue();
328
329         if (nextAction.equals(NextAction.UNREGISTER)) {
330             assertThat(timedOut.get()).isFalse();
331             verify(mockScheduledFuture).cancel(anyBoolean());
332         } else {
333             assertThat(timedOut.get()).isTrue();
334             verify(mockScheduledFuture, never()).cancel(anyBoolean());
335         }
336     }
337
338     @SuppressWarnings({ "rawtypes", "unchecked" })
339     @Test
340     public void testChangeCallbackNotInvokedAfterTimeout() {
341         Duration timeout = Duration.ofMillis(10);
342
343         ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
344         ScheduledFuture<?> mockScheduledFuture = mock(ScheduledFuture.class);
345         doReturn(mockScheduledFuture).when(mockScheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
346
347         ListeningScheduledExecutorService directExecutorService = MoreExecutors.listeningDecorator(mockScheduler);
348
349         DataBroker spiedDataBroker = spy(db);
350
351         final DataTreeChangeListener mockListener = mock(DataTreeChangeListener.class);
352         doAnswer(invocation -> {
353             db.registerDataTreeChangeListener(invocation.getArgument(0), mockListener);
354             return mock(ListenerRegistration.class);
355         }).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
356
357         DataTreeEventCallbackRegistrar callbackRegistrar =
358                 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker, directExecutorService);
359
360         AtomicBoolean updated = new AtomicBoolean(false);
361         AtomicBoolean timedOut = new AtomicBoolean(false);
362         callbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, dataObject -> {
363             updated.set(true);
364             return NextAction.UNREGISTER;
365         }, timeout, id -> timedOut.set(true));
366
367         ArgumentCaptor<Runnable> timerTask = ArgumentCaptor.forClass(Runnable.class);
368         verify(mockScheduler).schedule(timerTask.capture(), eq(timeout.toMillis()), eq(TimeUnit.MILLISECONDS));
369
370         ArgumentCaptor<DataTreeChangeListener> realListener = ArgumentCaptor.forClass(DataTreeChangeListener.class);
371         verify(spiedDataBroker).registerDataTreeChangeListener(any(), realListener.capture());
372
373         timerTask.getValue().run();
374         assertThat(timedOut.get()).isTrue();
375
376         AtomicBoolean onDataTreeChangeDone = new AtomicBoolean(false);
377         doAnswer(invocation -> {
378             try {
379                 realListener.getValue().onDataTreeChanged(invocation.getArgument(0));
380             } finally {
381                 onDataTreeChangeDone.set(true);
382             }
383             return null;
384         }).when(mockListener).onDataTreeChanged(anyCollection());
385
386         new RetryingManagedNewTransactionRunner(db, 1).callWithNewWriteOnlyTransactionAndSubmit(
387             tx -> tx.mergeParentStructurePut(OPERATIONAL, FOO_PATH, FOO_DATA));
388
389         await().untilTrue(onDataTreeChangeDone);
390         assertThat(updated.get()).isFalse();
391     }
392
393     @SuppressWarnings("unchecked")
394     @Test
395     public void testChangeCallbackOccursImmediatelyAfterRegistration() {
396         ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
397         ScheduledFuture<?> mockScheduledFuture = mock(ScheduledFuture.class);
398         doReturn(mockScheduledFuture).when(mockScheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
399
400         DataBroker spiedDataBroker = spy(db);
401
402         AtomicBoolean updated = new AtomicBoolean(false);
403         ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
404         doAnswer(invocation -> {
405             DataTreeChangeListener<?> listener = invocation.getArgument(1);
406             db.registerDataTreeChangeListener(invocation.getArgument(0), listener);
407             db1.syncWrite(OPERATIONAL, FOO_PATH, FOO_DATA);
408             await().untilTrue(updated);
409             return mockListenerReg;
410         }).when(spiedDataBroker).registerDataTreeChangeListener(any(), any());
411
412         DataTreeEventCallbackRegistrar callbackRegistrar =
413                 new DataTreeEventCallbackRegistrarImpl(spiedDataBroker, mockScheduler);
414
415         callbackRegistrar.onAdd(OPERATIONAL, FOO_PATH, dataObject -> {
416             updated.set(true);
417             return NextAction.UNREGISTER;
418         }, Duration.ofMillis(10), id -> { });
419
420         await().untilTrue(updated);
421         verify(mockListenerReg).close();
422         verify(mockScheduledFuture).cancel(anyBoolean());
423     }
424 }