b5567fa1d59b247d2f28c4804989e0c6b95a495b
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DataChangeListenerRegistrationProxyTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.mockito.Matchers.any;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doAnswer;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.mock;
15 import akka.actor.ActorRef;
16 import akka.actor.ActorSystem;
17 import akka.actor.Props;
18 import akka.actor.Terminated;
19 import akka.dispatch.ExecutionContexts;
20 import akka.dispatch.Futures;
21 import akka.testkit.JavaTestKit;
22 import akka.util.Timeout;
23 import com.google.common.util.concurrent.MoreExecutors;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.Assert;
27 import org.junit.Test;
28 import org.mockito.Mockito;
29 import org.mockito.invocation.InvocationOnMock;
30 import org.mockito.stubbing.Answer;
31 import org.opendaylight.controller.cluster.datastore.config.Configuration;
32 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
33 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
34 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
35 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
36 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
37 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
38 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
41 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
42 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
43 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
44 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
45 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
46 import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
49 import scala.concurrent.ExecutionContextExecutor;
50 import scala.concurrent.Future;
51 import scala.concurrent.duration.FiniteDuration;
52
53 /**
54  * Unit tests for DataChangeListenerRegistrationProxy.
55  *
56  * @author Thomas Pantelis
57  */
58 public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
59
60     @SuppressWarnings("unchecked")
61     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> mockListener =
62             Mockito.mock(AsyncDataChangeListener.class);
63
64     @Test
65     public void testGetInstance() throws Exception {
66         DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
67                 "shard", Mockito.mock(ActorContext.class), mockListener);
68
69         Assert.assertEquals(mockListener, proxy.getInstance());
70     }
71
72     @Test(timeout=10000)
73     public void testSuccessfulRegistration() {
74         new JavaTestKit(getSystem()) {{
75             ActorContext actorContext = new ActorContext(getSystem(), getRef(),
76                     mock(ClusterWrapper.class), mock(Configuration.class));
77
78             final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
79                     "shard-1", actorContext, mockListener);
80
81             final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
82             final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
83             new Thread() {
84                 @Override
85                 public void run() {
86                     proxy.init(path, scope);
87                 }
88
89             }.start();
90
91             FiniteDuration timeout = duration("5 seconds");
92             FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
93             Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
94
95             reply(new LocalShardFound(getRef()));
96
97             RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
98             Assert.assertEquals("getPath", path, registerMsg.getPath());
99             Assert.assertEquals("getScope", scope, registerMsg.getScope());
100             Assert.assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances());
101
102             reply(new RegisterChangeListenerReply(getRef()));
103
104             for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
105                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
106             }
107
108             Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
109                     proxy.getListenerRegistrationActor());
110
111             watch(proxy.getDataChangeListenerActor());
112
113             proxy.close();
114
115             // The listener registration actor should get a Close message
116             expectMsgClass(timeout, CloseDataChangeListenerRegistration.class);
117
118             // The DataChangeListener actor should be terminated
119             expectMsgClass(timeout, Terminated.class);
120
121             proxy.close();
122
123             expectNoMsg();
124         }};
125     }
126
127     @Test(timeout=10000)
128     public void testSuccessfulRegistrationForClusteredListener() {
129         new JavaTestKit(getSystem()) {{
130             ActorContext actorContext = new ActorContext(getSystem(), getRef(),
131                 mock(ClusterWrapper.class), mock(Configuration.class));
132
133             AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> mockClusteredListener =
134                 Mockito.mock(ClusteredDOMDataChangeListener.class);
135
136             final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
137                 "shard-1", actorContext, mockClusteredListener);
138
139             final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
140             final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
141             new Thread() {
142                 @Override
143                 public void run() {
144                     proxy.init(path, scope);
145                 }
146
147             }.start();
148
149             FiniteDuration timeout = duration("5 seconds");
150             FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
151             Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
152
153             reply(new LocalShardFound(getRef()));
154
155             RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
156             Assert.assertEquals("getPath", path, registerMsg.getPath());
157             Assert.assertEquals("getScope", scope, registerMsg.getScope());
158             Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances());
159
160             reply(new RegisterChangeListenerReply(getRef()));
161
162             for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
163                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
164             }
165
166             Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
167                 proxy.getListenerRegistrationActor());
168
169             watch(proxy.getDataChangeListenerActor());
170
171             proxy.close();
172
173             // The listener registration actor should get a Close message
174             expectMsgClass(timeout, CloseDataChangeListenerRegistration.class);
175
176             // The DataChangeListener actor should be terminated
177             expectMsgClass(timeout, Terminated.class);
178
179             proxy.close();
180
181             expectNoMsg();
182         }};
183     }
184
185     @Test(timeout=10000)
186     public void testLocalShardNotFound() {
187         new JavaTestKit(getSystem()) {{
188             ActorContext actorContext = new ActorContext(getSystem(), getRef(),
189                     mock(ClusterWrapper.class), mock(Configuration.class));
190
191             final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
192                     "shard-1", actorContext, mockListener);
193
194             final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
195             final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
196             new Thread() {
197                 @Override
198                 public void run() {
199                     proxy.init(path, scope);
200                 }
201
202             }.start();
203
204             FiniteDuration timeout = duration("5 seconds");
205             FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
206             Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
207
208             reply(new LocalShardNotFound("shard-1"));
209
210             expectNoMsg(duration("1 seconds"));
211         }};
212     }
213
214     @Test(timeout=10000)
215     public void testLocalShardNotInitialized() {
216         new JavaTestKit(getSystem()) {{
217             ActorContext actorContext = new ActorContext(getSystem(), getRef(),
218                     mock(ClusterWrapper.class), mock(Configuration.class));
219
220             final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
221                     "shard-1", actorContext, mockListener);
222
223             final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
224             final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
225             new Thread() {
226                 @Override
227                 public void run() {
228                     proxy.init(path, scope);
229                 }
230
231             }.start();
232
233             FiniteDuration timeout = duration("5 seconds");
234             FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
235             Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
236
237             reply(new NotInitializedException("not initialized"));
238
239             new Within(duration("1 seconds")) {
240                 @Override
241                 protected void run() {
242                     expectNoMsg();
243                 }
244             };
245         }};
246     }
247
248     @Test
249     public void testFailedRegistration() {
250         new JavaTestKit(getSystem()) {{
251             ActorSystem mockActorSystem = mock(ActorSystem.class);
252
253             ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class),
254                     "testFailedRegistration");
255             doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
256             ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(
257                     MoreExecutors.directExecutor());
258
259
260             ActorContext actorContext = mock(ActorContext.class);
261
262             doReturn(executor).when(actorContext).getClientDispatcher();
263
264             String shardName = "shard-1";
265             final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
266                     shardName, actorContext, mockListener);
267
268             doReturn(mockActorSystem).when(actorContext).getActorSystem();
269             doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
270             doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
271             doReturn(Futures.failed(new RuntimeException("mock"))).
272                     when(actorContext).executeOperationAsync(any(ActorRef.class),
273                             any(Object.class), any(Timeout.class));
274             doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext();
275
276             proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
277                     AsyncDataBroker.DataChangeScope.ONE);
278
279             Assert.assertEquals("getListenerRegistrationActor", null,
280                     proxy.getListenerRegistrationActor());
281         }};
282     }
283
284     @Test
285     public void testCloseBeforeRegistration() {
286         new JavaTestKit(getSystem()) {{
287             ActorContext actorContext = mock(ActorContext.class);
288
289             String shardName = "shard-1";
290             final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
291                     shardName, actorContext, mockListener);
292
293             doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
294             doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
295             doReturn(getSystem()).when(actorContext).getActorSystem();
296             doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorContext).getNotificationDispatcherPath();
297             doReturn(getSystem().actorSelection(getRef().path())).
298                     when(actorContext).actorSelection(getRef().path());
299             doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
300             doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
301
302             Answer<Future<Object>> answer = new Answer<Future<Object>>() {
303                 @Override
304                 public Future<Object> answer(InvocationOnMock invocation) {
305                     proxy.close();
306                     return Futures.successful((Object)new RegisterChangeListenerReply(getRef()));
307                 }
308             };
309
310             doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class),
311                     any(Object.class), any(Timeout.class));
312
313             proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
314                     AsyncDataBroker.DataChangeScope.ONE);
315
316             expectMsgClass(duration("5 seconds"), CloseDataChangeListenerRegistration.class);
317
318             Assert.assertEquals("getListenerRegistrationActor", null,
319                     proxy.getListenerRegistrationActor());
320         }};
321     }
322 }