Bug 8231: Fix testChangeListenerRegistration failure
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DataTreeChangeListenerProxyTest.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.mockito.Matchers.any;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doAnswer;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.mock;
15
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.Props;
19 import akka.actor.Terminated;
20 import akka.dispatch.ExecutionContexts;
21 import akka.dispatch.Futures;
22 import akka.testkit.JavaTestKit;
23 import akka.util.Timeout;
24 import com.google.common.util.concurrent.MoreExecutors;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import java.util.concurrent.TimeUnit;
27 import org.junit.Assert;
28 import org.junit.Test;
29 import org.mockito.stubbing.Answer;
30 import org.opendaylight.controller.cluster.datastore.config.Configuration;
31 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
33 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
34 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
35 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
36 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
37 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
38 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
39 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
40 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
41 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
42 import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener;
43 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import scala.concurrent.ExecutionContextExecutor;
46 import scala.concurrent.Future;
47 import scala.concurrent.duration.FiniteDuration;
48
49 public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
50     private final DOMDataTreeChangeListener mockListener = mock(DOMDataTreeChangeListener.class);
51
52     @Test(timeout = 10000)
53     public void testSuccessfulRegistration() {
54         new JavaTestKit(getSystem()) {
55             {
56                 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
57                         mock(Configuration.class));
58
59                 final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
60                 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
61                         actorContext, mockListener, path);
62
63                 new Thread() {
64                     @Override
65                     public void run() {
66                         proxy.init("shard-1");
67                     }
68
69                 }.start();
70
71                 FiniteDuration timeout = duration("5 seconds");
72                 FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
73                 Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
74
75                 reply(new LocalShardFound(getRef()));
76
77                 RegisterDataTreeChangeListener registerMsg = expectMsgClass(timeout,
78                         RegisterDataTreeChangeListener.class);
79                 Assert.assertEquals("getPath", path, registerMsg.getPath());
80                 Assert.assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances());
81
82                 reply(new RegisterDataTreeChangeListenerReply(getRef()));
83
84                 for (int i = 0; i < 20 * 5 && proxy.getListenerRegistrationActor() == null; i++) {
85                     Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
86                 }
87
88                 Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
89                         proxy.getListenerRegistrationActor());
90
91                 watch(proxy.getDataChangeListenerActor());
92
93                 proxy.close();
94
95                 // The listener registration actor should get a Close message
96                 expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
97
98                 // The DataChangeListener actor should be terminated
99                 expectMsgClass(timeout, Terminated.class);
100
101                 proxy.close();
102
103                 expectNoMsg();
104             }
105         };
106     }
107
108     @Test(timeout = 10000)
109     public void testSuccessfulRegistrationForClusteredListener() {
110         new JavaTestKit(getSystem()) {
111             {
112                 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
113                         mock(Configuration.class));
114
115                 ClusteredDOMDataTreeChangeListener mockClusteredListener = mock(
116                         ClusteredDOMDataTreeChangeListener.class);
117
118                 final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
119                 final DataTreeChangeListenerProxy<ClusteredDOMDataTreeChangeListener> proxy =
120                         new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener, path);
121
122                 new Thread() {
123                     @Override
124                     public void run() {
125                         proxy.init("shard-1");
126                     }
127
128                 }.start();
129
130                 FiniteDuration timeout = duration("5 seconds");
131                 FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
132                 Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
133
134                 reply(new LocalShardFound(getRef()));
135
136                 RegisterDataTreeChangeListener registerMsg = expectMsgClass(timeout,
137                         RegisterDataTreeChangeListener.class);
138                 Assert.assertEquals("getPath", path, registerMsg.getPath());
139                 Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances());
140
141                 proxy.close();
142             }
143         };
144     }
145
146     @Test(timeout = 10000)
147     public void testLocalShardNotFound() {
148         new JavaTestKit(getSystem()) {
149             {
150                 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
151                         mock(Configuration.class));
152
153                 final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
154                 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
155                         actorContext, mockListener, path);
156
157                 new Thread() {
158                     @Override
159                     public void run() {
160                         proxy.init("shard-1");
161                     }
162
163                 }.start();
164
165                 FiniteDuration timeout = duration("5 seconds");
166                 FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
167                 Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
168
169                 reply(new LocalShardNotFound("shard-1"));
170
171                 expectNoMsg(duration("1 seconds"));
172
173                 proxy.close();
174             }
175         };
176     }
177
178     @Test(timeout = 10000)
179     public void testLocalShardNotInitialized() {
180         new JavaTestKit(getSystem()) {
181             {
182                 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
183                         mock(Configuration.class));
184
185                 final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
186                 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
187                         actorContext, mockListener, path);
188
189                 new Thread() {
190                     @Override
191                     public void run() {
192                         proxy.init("shard-1");
193                     }
194
195                 }.start();
196
197                 FiniteDuration timeout = duration("5 seconds");
198                 FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
199                 Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
200
201                 reply(new NotInitializedException("not initialized"));
202
203                 new Within(duration("1 seconds")) {
204                     @Override
205                     protected void run() {
206                         expectNoMsg();
207                     }
208                 };
209
210                 proxy.close();
211             }
212         };
213     }
214
215     @Test
216     public void testFailedRegistration() {
217         new JavaTestKit(getSystem()) {
218             {
219                 ActorSystem mockActorSystem = mock(ActorSystem.class);
220
221                 ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class), "testFailedRegistration");
222                 doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
223                 ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.directExecutor());
224
225                 ActorContext actorContext = mock(ActorContext.class);
226                 final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
227
228                 doReturn(executor).when(actorContext).getClientDispatcher();
229                 doReturn(mockActorSystem).when(actorContext).getActorSystem();
230
231                 String shardName = "shard-1";
232                 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
233                         actorContext, mockListener, path);
234
235                 doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
236                 doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
237                 doReturn(Futures.failed(new RuntimeException("mock"))).when(actorContext)
238                         .executeOperationAsync(any(ActorRef.class), any(Object.class), any(Timeout.class));
239                 doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext();
240
241                 proxy.init("shard-1");
242
243                 Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
244
245                 proxy.close();
246             }
247         };
248     }
249
250     @Test
251     public void testCloseBeforeRegistration() {
252         new JavaTestKit(getSystem()) {
253             {
254                 ActorContext actorContext = mock(ActorContext.class);
255
256                 String shardName = "shard-1";
257
258                 doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
259                 doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
260                 doReturn(getSystem()).when(actorContext).getActorSystem();
261                 doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorContext).getNotificationDispatcherPath();
262                 doReturn(getSystem().actorSelection(getRef().path())).when(actorContext)
263                         .actorSelection(getRef().path());
264                 doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
265                 doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
266
267                 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
268                         actorContext, mockListener, YangInstanceIdentifier.of(TestModel.TEST_QNAME));
269
270                 Answer<Future<Object>> answer = invocation -> {
271                     proxy.close();
272                     return Futures.successful((Object) new RegisterDataTreeChangeListenerReply(getRef()));
273                 };
274
275                 doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class), any(Object.class),
276                         any(Timeout.class));
277
278                 proxy.init(shardName);
279
280                 expectMsgClass(duration("5 seconds"), CloseDataTreeNotificationListenerRegistration.class);
281
282                 Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
283             }
284         };
285     }
286 }