Enabling Data Change Notifications for all nodes in cluster.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DataChangeListenerRegistrationProxyTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.mockito.Matchers.any;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doAnswer;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.mock;
15 import akka.actor.ActorRef;
16 import akka.actor.ActorSystem;
17 import akka.actor.Props;
18 import akka.actor.Terminated;
19 import akka.dispatch.ExecutionContexts;
20 import akka.dispatch.Futures;
21 import akka.testkit.JavaTestKit;
22 import akka.util.Timeout;
23 import com.google.common.util.concurrent.MoreExecutors;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.Assert;
27 import org.junit.Test;
28 import org.mockito.Mockito;
29 import org.mockito.invocation.InvocationOnMock;
30 import org.mockito.stubbing.Answer;
31 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
33 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
34 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
35 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
36 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
37 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
38 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
39 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
40 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
41 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
42 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
43 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
44 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
45 import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import scala.concurrent.ExecutionContextExecutor;
49 import scala.concurrent.Future;
50 import scala.concurrent.duration.FiniteDuration;
51
52 /**
53  * Unit tests for DataChangeListenerRegistrationProxy.
54  *
55  * @author Thomas Pantelis
56  */
57 public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
58
59     @SuppressWarnings("unchecked")
60     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> mockListener =
61             Mockito.mock(AsyncDataChangeListener.class);
62
63     @Test
64     public void testGetInstance() throws Exception {
65         DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
66                 "shard", Mockito.mock(ActorContext.class), mockListener);
67
68         Assert.assertEquals(mockListener, proxy.getInstance());
69     }
70
71     @Test(timeout=10000)
72     public void testSuccessfulRegistration() {
73         new JavaTestKit(getSystem()) {{
74             ActorContext actorContext = new ActorContext(getSystem(), getRef(),
75                     mock(ClusterWrapper.class), mock(Configuration.class));
76
77             final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
78                     "shard-1", actorContext, mockListener);
79
80             final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
81             final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
82             new Thread() {
83                 @Override
84                 public void run() {
85                     proxy.init(path, scope);
86                 }
87
88             }.start();
89
90             FiniteDuration timeout = duration("5 seconds");
91             FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
92             Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
93
94             reply(new LocalShardFound(getRef()));
95
96             RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
97             Assert.assertEquals("getPath", path, registerMsg.getPath());
98             Assert.assertEquals("getScope", scope, registerMsg.getScope());
99             Assert.assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances());
100
101             reply(new RegisterChangeListenerReply(getRef()));
102
103             for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
104                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
105             }
106
107             Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
108                     proxy.getListenerRegistrationActor());
109
110             watch(proxy.getDataChangeListenerActor());
111
112             proxy.close();
113
114             // The listener registration actor should get a Close message
115             expectMsgClass(timeout, CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
116
117             // The DataChangeListener actor should be terminated
118             expectMsgClass(timeout, Terminated.class);
119
120             proxy.close();
121
122             expectNoMsg();
123         }};
124     }
125
126     @Test(timeout=10000)
127     public void testSuccessfulRegistrationForClusteredListener() {
128         new JavaTestKit(getSystem()) {{
129             ActorContext actorContext = new ActorContext(getSystem(), getRef(),
130                 mock(ClusterWrapper.class), mock(Configuration.class));
131
132             AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> mockClusteredListener =
133                 Mockito.mock(ClusteredDOMDataChangeListener.class);
134
135             final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
136                 "shard-1", actorContext, mockClusteredListener);
137
138             final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
139             final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
140             new Thread() {
141                 @Override
142                 public void run() {
143                     proxy.init(path, scope);
144                 }
145
146             }.start();
147
148             FiniteDuration timeout = duration("5 seconds");
149             FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
150             Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
151
152             reply(new LocalShardFound(getRef()));
153
154             RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
155             Assert.assertEquals("getPath", path, registerMsg.getPath());
156             Assert.assertEquals("getScope", scope, registerMsg.getScope());
157             Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances());
158
159             reply(new RegisterChangeListenerReply(getRef()));
160
161             for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
162                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
163             }
164
165             Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
166                 proxy.getListenerRegistrationActor());
167
168             watch(proxy.getDataChangeListenerActor());
169
170             proxy.close();
171
172             // The listener registration actor should get a Close message
173             expectMsgClass(timeout, CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
174
175             // The DataChangeListener actor should be terminated
176             expectMsgClass(timeout, Terminated.class);
177
178             proxy.close();
179
180             expectNoMsg();
181         }};
182     }
183
184     @Test(timeout=10000)
185     public void testLocalShardNotFound() {
186         new JavaTestKit(getSystem()) {{
187             ActorContext actorContext = new ActorContext(getSystem(), getRef(),
188                     mock(ClusterWrapper.class), mock(Configuration.class));
189
190             final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
191                     "shard-1", actorContext, mockListener);
192
193             final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
194             final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
195             new Thread() {
196                 @Override
197                 public void run() {
198                     proxy.init(path, scope);
199                 }
200
201             }.start();
202
203             FiniteDuration timeout = duration("5 seconds");
204             FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
205             Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
206
207             reply(new LocalShardNotFound("shard-1"));
208
209             expectNoMsg(duration("1 seconds"));
210         }};
211     }
212
213     @Test(timeout=10000)
214     public void testLocalShardNotInitialized() {
215         new JavaTestKit(getSystem()) {{
216             ActorContext actorContext = new ActorContext(getSystem(), getRef(),
217                     mock(ClusterWrapper.class), mock(Configuration.class));
218
219             final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
220                     "shard-1", actorContext, mockListener);
221
222             final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
223             final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
224             new Thread() {
225                 @Override
226                 public void run() {
227                     proxy.init(path, scope);
228                 }
229
230             }.start();
231
232             FiniteDuration timeout = duration("5 seconds");
233             FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
234             Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
235
236             reply(new NotInitializedException("not initialized"));
237
238             new Within(duration("1 seconds")) {
239                 @Override
240                 protected void run() {
241                     expectNoMsg();
242                 }
243             };
244         }};
245     }
246
247     @Test
248     public void testFailedRegistration() {
249         new JavaTestKit(getSystem()) {{
250             ActorSystem mockActorSystem = mock(ActorSystem.class);
251
252             ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class),
253                     "testFailedRegistration");
254             doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
255             ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(
256                     MoreExecutors.sameThreadExecutor());
257
258
259             ActorContext actorContext = mock(ActorContext.class);
260
261             doReturn(executor).when(actorContext).getClientDispatcher();
262
263             String shardName = "shard-1";
264             final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
265                     shardName, actorContext, mockListener);
266
267             doReturn(mockActorSystem).when(actorContext).getActorSystem();
268             doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
269             doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
270             doReturn(Futures.failed(new RuntimeException("mock"))).
271                     when(actorContext).executeOperationAsync(any(ActorRef.class),
272                             any(Object.class), any(Timeout.class));
273             doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext();
274
275             proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
276                     AsyncDataBroker.DataChangeScope.ONE);
277
278             Assert.assertEquals("getListenerRegistrationActor", null,
279                     proxy.getListenerRegistrationActor());
280         }};
281     }
282
283     @Test
284     public void testCloseBeforeRegistration() {
285         new JavaTestKit(getSystem()) {{
286             ActorContext actorContext = mock(ActorContext.class);
287
288             String shardName = "shard-1";
289             final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
290                     shardName, actorContext, mockListener);
291
292             doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
293             doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
294             doReturn(getSystem()).when(actorContext).getActorSystem();
295             doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorContext).getNotificationDispatcherPath();
296             doReturn(getSystem().actorSelection(getRef().path())).
297                     when(actorContext).actorSelection(getRef().path());
298             doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
299             doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
300
301             Answer<Future<Object>> answer = new Answer<Future<Object>>() {
302                 @Override
303                 public Future<Object> answer(InvocationOnMock invocation) {
304                     proxy.close();
305                     return Futures.successful((Object)new RegisterChangeListenerReply(getRef()));
306                 }
307             };
308
309             doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class),
310                     any(Object.class), any(Timeout.class));
311
312             proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
313                     AsyncDataBroker.DataChangeScope.ONE);
314
315             expectMsgClass(duration("5 seconds"), CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
316
317             Assert.assertEquals("getListenerRegistrationActor", null,
318                     proxy.getListenerRegistrationActor());
319         }};
320     }
321 }