Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DataTreeChangeListenerProxyTest.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.Mockito.doAnswer;
16 import static org.mockito.Mockito.doNothing;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSystem;
22 import akka.actor.Props;
23 import akka.actor.Terminated;
24 import akka.dispatch.ExecutionContexts;
25 import akka.dispatch.Futures;
26 import akka.testkit.javadsl.TestKit;
27 import akka.util.Timeout;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.time.Duration;
31 import java.util.Map;
32 import java.util.Map.Entry;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.TimeUnit;
35 import java.util.function.Consumer;
36 import org.eclipse.jdt.annotation.NonNullByDefault;
37 import org.junit.Test;
38 import org.mockito.ArgumentCaptor;
39 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
40 import org.opendaylight.controller.cluster.datastore.config.Configuration;
41 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
42 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
43 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
46 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
47 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
48 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
49 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
50 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
51 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53
54 public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
55     private final DOMDataTreeChangeListener mockListener = mock(DOMDataTreeChangeListener.class);
56
57     @Test(timeout = 10000)
58     public void testSuccessfulRegistration() {
59         final var kit = new TestKit(getSystem());
60         final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
61             mock(Configuration.class));
62
63         final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
64         final var proxy = startProxyAsync(actorUtils, path, false);
65
66         final var timeout = Duration.ofSeconds(5);
67         final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
68         assertEquals("shard-1", findLocalShard.getShardName());
69
70         kit.reply(new LocalShardFound(kit.getRef()));
71
72         final var registerMsg = kit.expectMsgClass(timeout, RegisterDataTreeChangeListener.class);
73         assertEquals(path, registerMsg.getPath());
74         assertFalse(registerMsg.isRegisterOnAllInstances());
75
76         kit.reply(new RegisterDataTreeNotificationListenerReply(kit.getRef()));
77
78         for (int i = 0; i < 20 * 5 && proxy.getListenerRegistrationActor() == null; i++) {
79             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
80         }
81
82         assertEquals(getSystem().actorSelection(kit.getRef().path()), proxy.getListenerRegistrationActor());
83
84         kit.watch(proxy.getDataChangeListenerActor());
85
86         proxy.close();
87
88         // The listener registration actor should get a Close message
89         kit.expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
90
91         // The DataChangeListener actor should be terminated
92         kit.expectMsgClass(timeout, Terminated.class);
93
94         proxy.close();
95
96         kit.expectNoMessage();
97     }
98
99     @Test(timeout = 10000)
100     public void testSuccessfulRegistrationForClusteredListener() {
101         final var kit = new TestKit(getSystem());
102         final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
103             mock(Configuration.class));
104
105         final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
106         final var proxy = startProxyAsync(actorUtils, path, true);
107
108         final var timeout = Duration.ofSeconds(5);
109         final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
110         assertEquals("shard-1", findLocalShard.getShardName());
111
112         kit.reply(new LocalShardFound(kit.getRef()));
113
114         final var registerMsg = kit.expectMsgClass(timeout, RegisterDataTreeChangeListener.class);
115         assertEquals(path, registerMsg.getPath());
116         assertTrue(registerMsg.isRegisterOnAllInstances());
117
118         proxy.close();
119     }
120
121     @Test(timeout = 10000)
122     public void testLocalShardNotFound() {
123         final var kit = new TestKit(getSystem());
124         final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
125             mock(Configuration.class));
126
127         final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
128         final var proxy = startProxyAsync(actorUtils, path, true);
129
130         final var timeout = Duration.ofSeconds(5);
131         final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
132         assertEquals("shard-1", findLocalShard.getShardName());
133
134         kit.reply(new LocalShardNotFound("shard-1"));
135
136         kit.expectNoMessage(Duration.ofSeconds(1));
137
138         proxy.close();
139     }
140
141     @Test(timeout = 10000)
142     public void testLocalShardNotInitialized() {
143         final var kit = new TestKit(getSystem());
144         final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
145             mock(Configuration.class));
146
147         final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
148         final var proxy = startProxyAsync(actorUtils, path, false);
149
150         final var timeout = Duration.ofSeconds(5);
151         final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
152         assertEquals("shard-1", findLocalShard.getShardName());
153
154         kit.reply(new NotInitializedException("not initialized"));
155
156         kit.within(Duration.ofSeconds(1), () ->  {
157             kit.expectNoMessage();
158             return null;
159         });
160
161         proxy.close();
162     }
163
164     @Test
165     public void testFailedRegistration() {
166         final var kit = new TestKit(getSystem());
167         final var mockActorSystem = mock(ActorSystem.class);
168
169         final var mockActor = getSystem().actorOf(Props.create(DoNothingActor.class), "testFailedRegistration");
170         doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
171         final var executor = ExecutionContexts.fromExecutor(MoreExecutors.directExecutor());
172
173         final var actorUtils = mock(ActorUtils.class);
174         final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
175
176         doReturn(executor).when(actorUtils).getClientDispatcher();
177         doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
178         doReturn(mockActorSystem).when(actorUtils).getActorSystem();
179
180         doReturn(kit.duration("5 seconds")).when(actorUtils).getOperationDuration();
181         doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync("shard-1");
182         doReturn(Futures.failed(new RuntimeException("mock"))).when(actorUtils).executeOperationAsync(
183             any(ActorRef.class), any(Object.class), any(Timeout.class));
184
185         final var proxy = DataTreeChangeListenerProxy.of(actorUtils, mockListener, path, true, "shard-1");
186         assertNull(proxy.getListenerRegistrationActor());
187
188         proxy.close();
189     }
190
191     @Test
192     public void testCloseBeforeRegistration() {
193         final var kit = new TestKit(getSystem());
194         final var actorUtils = mock(ActorUtils.class);
195
196         doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
197         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorUtils).getClientDispatcher();
198         doReturn(getSystem()).when(actorUtils).getActorSystem();
199         doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorUtils).getNotificationDispatcherPath();
200         doReturn(getSystem().actorSelection(kit.getRef().path())).when(actorUtils).actorSelection(
201             kit.getRef().path());
202         doReturn(kit.duration("5 seconds")).when(actorUtils).getOperationDuration();
203         doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync("shard-1");
204
205         final var proxy = createProxy(actorUtils, YangInstanceIdentifier.of(TestModel.TEST_QNAME), true);
206         final var instance = proxy.getKey();
207
208         doAnswer(invocation -> {
209             instance.close();
210             return Futures.successful(new RegisterDataTreeNotificationListenerReply(kit.getRef()));
211         }).when(actorUtils).executeOperationAsync(any(ActorRef.class), any(Object.class), any(Timeout.class));
212         proxy.getValue().run();
213
214         kit.expectMsgClass(Duration.ofSeconds(5), CloseDataTreeNotificationListenerRegistration.class);
215
216         assertNull(instance.getListenerRegistrationActor());
217     }
218
219     @NonNullByDefault
220     private DataTreeChangeListenerProxy startProxyAsync(final ActorUtils actorUtils, final YangInstanceIdentifier path,
221             final boolean clustered) {
222         return startProxyAsync(actorUtils, path, clustered, Runnable::run);
223     }
224
225     @NonNullByDefault
226     private DataTreeChangeListenerProxy startProxyAsync(final ActorUtils actorUtils, final YangInstanceIdentifier path,
227             final boolean clustered, final Consumer<Runnable> execute) {
228         final var proxy = createProxy(actorUtils, path, clustered);
229         final var thread = new Thread(proxy.getValue());
230         thread.setDaemon(true);
231         thread.start();
232         return proxy.getKey();
233     }
234
235     @NonNullByDefault
236     private Entry<DataTreeChangeListenerProxy, Runnable> createProxy(final ActorUtils actorUtils,
237             final YangInstanceIdentifier path, final boolean clustered) {
238         final var executor = mock(Executor.class);
239         final var captor = ArgumentCaptor.forClass(Runnable.class);
240         doNothing().when(executor).execute(captor.capture());
241         final var proxy = DataTreeChangeListenerProxy.ofTesting(actorUtils, mockListener, path, clustered, "shard-1",
242             executor);
243         return Map.entry(proxy, captor.getValue());
244     }
245 }