Convert DatastoreSnapshotRestore to OSGi DS
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DataTreeChangeListenerProxyTest.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.ArgumentMatchers.any;
14 import static org.mockito.ArgumentMatchers.eq;
15 import static org.mockito.Mockito.doAnswer;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSystem;
21 import akka.actor.Props;
22 import akka.actor.Terminated;
23 import akka.dispatch.ExecutionContexts;
24 import akka.dispatch.Futures;
25 import akka.testkit.javadsl.TestKit;
26 import akka.util.Timeout;
27 import com.google.common.util.concurrent.MoreExecutors;
28 import com.google.common.util.concurrent.Uninterruptibles;
29 import java.time.Duration;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.Test;
32 import org.mockito.stubbing.Answer;
33 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
34 import org.opendaylight.controller.cluster.datastore.config.Configuration;
35 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
36 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
37 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
38 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
39 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
40 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
41 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
42 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
43 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
44 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
45 import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
46 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 import scala.concurrent.ExecutionContextExecutor;
49 import scala.concurrent.Future;
50
51 public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
52     private final DOMDataTreeChangeListener mockListener = mock(DOMDataTreeChangeListener.class);
53
54     @Test(timeout = 10000)
55     public void testSuccessfulRegistration() {
56         final TestKit kit = new TestKit(getSystem());
57         ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
58             mock(Configuration.class));
59
60         final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
61         final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
62                 actorUtils, mockListener, path);
63
64         new Thread(() -> proxy.init("shard-1")).start();
65
66         Duration timeout = Duration.ofSeconds(5);
67         FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
68         assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
69
70         kit.reply(new LocalShardFound(kit.getRef()));
71
72         RegisterDataTreeChangeListener registerMsg = kit.expectMsgClass(timeout,
73             RegisterDataTreeChangeListener.class);
74         assertEquals("getPath", path, registerMsg.getPath());
75         assertFalse("isRegisterOnAllInstances", registerMsg.isRegisterOnAllInstances());
76
77         kit.reply(new RegisterDataTreeNotificationListenerReply(kit.getRef()));
78
79         for (int i = 0; i < 20 * 5 && proxy.getListenerRegistrationActor() == null; i++) {
80             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
81         }
82
83         assertEquals("getListenerRegistrationActor", getSystem().actorSelection(kit.getRef().path()),
84             proxy.getListenerRegistrationActor());
85
86         kit.watch(proxy.getDataChangeListenerActor());
87
88         proxy.close();
89
90         // The listener registration actor should get a Close message
91         kit.expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
92
93         // The DataChangeListener actor should be terminated
94         kit.expectMsgClass(timeout, Terminated.class);
95
96         proxy.close();
97
98         kit.expectNoMessage();
99     }
100
101     @Test(timeout = 10000)
102     public void testSuccessfulRegistrationForClusteredListener() {
103         final TestKit kit = new TestKit(getSystem());
104         ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
105             mock(Configuration.class));
106
107         ClusteredDOMDataTreeChangeListener mockClusteredListener = mock(
108             ClusteredDOMDataTreeChangeListener.class);
109
110         final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
111         final DataTreeChangeListenerProxy<ClusteredDOMDataTreeChangeListener> proxy =
112                 new DataTreeChangeListenerProxy<>(actorUtils, mockClusteredListener, path);
113
114         new Thread(() -> proxy.init("shard-1")).start();
115
116         Duration timeout = Duration.ofSeconds(5);
117         FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
118         assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
119
120         kit.reply(new LocalShardFound(kit.getRef()));
121
122         RegisterDataTreeChangeListener registerMsg = kit.expectMsgClass(timeout,
123             RegisterDataTreeChangeListener.class);
124         assertEquals("getPath", path, registerMsg.getPath());
125         assertTrue("isRegisterOnAllInstances", registerMsg.isRegisterOnAllInstances());
126
127         proxy.close();
128     }
129
130     @Test(timeout = 10000)
131     public void testLocalShardNotFound() {
132         final TestKit kit = new TestKit(getSystem());
133         ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
134             mock(Configuration.class));
135
136         final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
137         final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
138                 actorUtils, mockListener, path);
139
140         new Thread(() -> proxy.init("shard-1")).start();
141
142         Duration timeout = Duration.ofSeconds(5);
143         FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
144         assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
145
146         kit.reply(new LocalShardNotFound("shard-1"));
147
148         kit.expectNoMessage(Duration.ofSeconds(1));
149
150         proxy.close();
151     }
152
153     @Test(timeout = 10000)
154     public void testLocalShardNotInitialized() {
155         final TestKit kit = new TestKit(getSystem());
156         ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
157             mock(Configuration.class));
158
159         final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
160         final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
161                 actorUtils, mockListener, path);
162
163         new Thread(() -> proxy.init("shard-1")).start();
164
165         Duration timeout = Duration.ofSeconds(5);
166         FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
167         assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
168
169         kit.reply(new NotInitializedException("not initialized"));
170
171         kit.within(Duration.ofSeconds(1), () ->  {
172             kit.expectNoMessage();
173             return null;
174         });
175
176         proxy.close();
177     }
178
179     @Test
180     public void testFailedRegistration() {
181         final TestKit kit = new TestKit(getSystem());
182         ActorSystem mockActorSystem = mock(ActorSystem.class);
183
184         ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class), "testFailedRegistration");
185         doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
186         ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.directExecutor());
187
188         ActorUtils actorUtils = mock(ActorUtils.class);
189         final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
190
191         doReturn(executor).when(actorUtils).getClientDispatcher();
192         doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
193         doReturn(mockActorSystem).when(actorUtils).getActorSystem();
194
195         String shardName = "shard-1";
196         final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
197                 actorUtils, mockListener, path);
198
199         doReturn(kit.duration("5 seconds")).when(actorUtils).getOperationDuration();
200         doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync(eq(shardName));
201         doReturn(Futures.failed(new RuntimeException("mock"))).when(actorUtils).executeOperationAsync(
202             any(ActorRef.class), any(Object.class), any(Timeout.class));
203         doReturn(mock(DatastoreContext.class)).when(actorUtils).getDatastoreContext();
204
205         proxy.init("shard-1");
206
207         assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
208
209         proxy.close();
210     }
211
212     @Test
213     public void testCloseBeforeRegistration() {
214         final TestKit kit = new TestKit(getSystem());
215         ActorUtils actorUtils = mock(ActorUtils.class);
216
217         String shardName = "shard-1";
218
219         doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
220         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorUtils).getClientDispatcher();
221         doReturn(getSystem()).when(actorUtils).getActorSystem();
222         doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorUtils).getNotificationDispatcherPath();
223         doReturn(getSystem().actorSelection(kit.getRef().path())).when(actorUtils).actorSelection(
224             kit.getRef().path());
225         doReturn(kit.duration("5 seconds")).when(actorUtils).getOperationDuration();
226         doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync(eq(shardName));
227
228         final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
229                 actorUtils, mockListener, YangInstanceIdentifier.of(TestModel.TEST_QNAME));
230
231         Answer<Future<Object>> answer = invocation -> {
232             proxy.close();
233             return Futures.successful((Object) new RegisterDataTreeNotificationListenerReply(kit.getRef()));
234         };
235
236         doAnswer(answer).when(actorUtils).executeOperationAsync(any(ActorRef.class), any(Object.class),
237             any(Timeout.class));
238
239         proxy.init(shardName);
240
241         kit.expectMsgClass(Duration.ofSeconds(5), CloseDataTreeNotificationListenerRegistration.class);
242
243         assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
244     }
245 }