Cleanup TestKit use
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DataTreeChangeListenerProxyTest.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Matchers.any;
12 import static org.mockito.Matchers.eq;
13 import static org.mockito.Mockito.doAnswer;
14 import static org.mockito.Mockito.doReturn;
15 import static org.mockito.Mockito.mock;
16
17 import akka.actor.ActorRef;
18 import akka.actor.ActorSystem;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.dispatch.ExecutionContexts;
22 import akka.dispatch.Futures;
23 import akka.testkit.javadsl.TestKit;
24 import akka.util.Timeout;
25 import com.google.common.util.concurrent.MoreExecutors;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import java.util.concurrent.TimeUnit;
28 import org.junit.Test;
29 import org.mockito.stubbing.Answer;
30 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
31 import org.opendaylight.controller.cluster.datastore.config.Configuration;
32 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
33 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
34 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
35 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
36 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
37 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
38 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
41 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
42 import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
43 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import scala.concurrent.ExecutionContextExecutor;
46 import scala.concurrent.Future;
47 import scala.concurrent.duration.FiniteDuration;
48
49 public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
50     private final DOMDataTreeChangeListener mockListener = mock(DOMDataTreeChangeListener.class);
51
52     @Test(timeout = 10000)
53     public void testSuccessfulRegistration() {
54         final TestKit kit = new TestKit(getSystem());
55         ActorContext actorContext = new ActorContext(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
56             mock(Configuration.class));
57
58         final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
59         final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
60                 actorContext, mockListener, path);
61
62         new Thread(() -> proxy.init("shard-1")).start();
63
64         FiniteDuration timeout = kit.duration("5 seconds");
65         FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
66         assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
67
68         kit.reply(new LocalShardFound(kit.getRef()));
69
70         RegisterDataTreeChangeListener registerMsg = kit.expectMsgClass(timeout,
71             RegisterDataTreeChangeListener.class);
72         assertEquals("getPath", path, registerMsg.getPath());
73         assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances());
74
75         kit.reply(new RegisterDataTreeNotificationListenerReply(kit.getRef()));
76
77         for (int i = 0; i < 20 * 5 && proxy.getListenerRegistrationActor() == null; i++) {
78             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
79         }
80
81         assertEquals("getListenerRegistrationActor", getSystem().actorSelection(kit.getRef().path()),
82             proxy.getListenerRegistrationActor());
83
84         kit.watch(proxy.getDataChangeListenerActor());
85
86         proxy.close();
87
88         // The listener registration actor should get a Close message
89         kit.expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
90
91         // The DataChangeListener actor should be terminated
92         kit.expectMsgClass(timeout, Terminated.class);
93
94         proxy.close();
95
96         kit.expectNoMessage();
97     }
98
99     @Test(timeout = 10000)
100     public void testSuccessfulRegistrationForClusteredListener() {
101         final TestKit kit = new TestKit(getSystem());
102         ActorContext actorContext = new ActorContext(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
103             mock(Configuration.class));
104
105         ClusteredDOMDataTreeChangeListener mockClusteredListener = mock(
106             ClusteredDOMDataTreeChangeListener.class);
107
108         final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
109         final DataTreeChangeListenerProxy<ClusteredDOMDataTreeChangeListener> proxy =
110                 new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener, path);
111
112         new Thread(() -> proxy.init("shard-1")).start();
113
114         FiniteDuration timeout = kit.duration("5 seconds");
115         FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
116         assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
117
118         kit.reply(new LocalShardFound(kit.getRef()));
119
120         RegisterDataTreeChangeListener registerMsg = kit.expectMsgClass(timeout,
121             RegisterDataTreeChangeListener.class);
122         assertEquals("getPath", path, registerMsg.getPath());
123         assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances());
124
125         proxy.close();
126     }
127
128     @Test(timeout = 10000)
129     public void testLocalShardNotFound() {
130         final TestKit kit = new TestKit(getSystem());
131         ActorContext actorContext = new ActorContext(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
132             mock(Configuration.class));
133
134         final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
135         final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
136                 actorContext, mockListener, path);
137
138         new Thread(() -> proxy.init("shard-1")).start();
139
140         FiniteDuration timeout = kit.duration("5 seconds");
141         FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
142         assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
143
144         kit.reply(new LocalShardNotFound("shard-1"));
145
146         kit.expectNoMessage(kit.duration("1 seconds"));
147
148         proxy.close();
149     }
150
151     @Test(timeout = 10000)
152     public void testLocalShardNotInitialized() {
153         final TestKit kit = new TestKit(getSystem());
154         ActorContext actorContext = new ActorContext(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
155             mock(Configuration.class));
156
157         final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
158         final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
159                 actorContext, mockListener, path);
160
161         new Thread(() -> proxy.init("shard-1")).start();
162
163         FiniteDuration timeout = kit.duration("5 seconds");
164         FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
165         assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
166
167         kit.reply(new NotInitializedException("not initialized"));
168
169         kit.within(kit.duration("1 seconds"), () ->  {
170             kit.expectNoMessage();
171             return null;
172         });
173
174         proxy.close();
175     }
176
177     @Test
178     public void testFailedRegistration() {
179         final TestKit kit = new TestKit(getSystem());
180         ActorSystem mockActorSystem = mock(ActorSystem.class);
181
182         ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class), "testFailedRegistration");
183         doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
184         ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.directExecutor());
185
186         ActorContext actorContext = mock(ActorContext.class);
187         final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
188
189         doReturn(executor).when(actorContext).getClientDispatcher();
190         doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
191         doReturn(mockActorSystem).when(actorContext).getActorSystem();
192
193         String shardName = "shard-1";
194         final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
195                 actorContext, mockListener, path);
196
197         doReturn(kit.duration("5 seconds")).when(actorContext).getOperationDuration();
198         doReturn(Futures.successful(kit.getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
199         doReturn(Futures.failed(new RuntimeException("mock"))).when(actorContext).executeOperationAsync(
200             any(ActorRef.class), any(Object.class), any(Timeout.class));
201         doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext();
202
203         proxy.init("shard-1");
204
205         assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
206
207         proxy.close();
208     }
209
210     @Test
211     public void testCloseBeforeRegistration() {
212         final TestKit kit = new TestKit(getSystem());
213         ActorContext actorContext = mock(ActorContext.class);
214
215         String shardName = "shard-1";
216
217         doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
218         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
219         doReturn(getSystem()).when(actorContext).getActorSystem();
220         doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorContext).getNotificationDispatcherPath();
221         doReturn(getSystem().actorSelection(kit.getRef().path())).when(actorContext).actorSelection(
222             kit.getRef().path());
223         doReturn(kit.duration("5 seconds")).when(actorContext).getOperationDuration();
224         doReturn(Futures.successful(kit.getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
225
226         final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
227                 actorContext, mockListener, YangInstanceIdentifier.of(TestModel.TEST_QNAME));
228
229         Answer<Future<Object>> answer = invocation -> {
230             proxy.close();
231             return Futures.successful((Object) new RegisterDataTreeNotificationListenerReply(kit.getRef()));
232         };
233
234         doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class), any(Object.class),
235             any(Timeout.class));
236
237         proxy.init(shardName);
238
239         kit.expectMsgClass(kit.duration("5 seconds"), CloseDataTreeNotificationListenerRegistration.class);
240
241         assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
242     }
243 }