2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.mockito.Matchers.any;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doAnswer;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.mock;
15 import akka.actor.ActorRef;
16 import akka.actor.ActorSystem;
17 import akka.actor.Props;
18 import akka.actor.Terminated;
19 import akka.dispatch.ExecutionContexts;
20 import akka.dispatch.Futures;
21 import akka.testkit.JavaTestKit;
22 import akka.util.Timeout;
23 import com.google.common.util.concurrent.MoreExecutors;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.Assert;
27 import org.junit.Test;
28 import org.mockito.Mockito;
29 import org.mockito.invocation.InvocationOnMock;
30 import org.mockito.stubbing.Answer;
31 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
33 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
34 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
35 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
36 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
37 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
38 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
39 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
40 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
41 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
42 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
43 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
44 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
45 import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import scala.concurrent.ExecutionContextExecutor;
49 import scala.concurrent.Future;
50 import scala.concurrent.duration.FiniteDuration;
53 * Unit tests for DataChangeListenerRegistrationProxy.
55 * @author Thomas Pantelis
57 public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
59 @SuppressWarnings("unchecked")
60 private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> mockListener =
61 Mockito.mock(AsyncDataChangeListener.class);
64 public void testGetInstance() throws Exception {
65 DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
66 "shard", Mockito.mock(ActorContext.class), mockListener);
68 Assert.assertEquals(mockListener, proxy.getInstance());
72 public void testSuccessfulRegistration() {
73 new JavaTestKit(getSystem()) {{
74 ActorContext actorContext = new ActorContext(getSystem(), getRef(),
75 mock(ClusterWrapper.class), mock(Configuration.class));
77 final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
78 "shard-1", actorContext, mockListener);
80 final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
81 final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
85 proxy.init(path, scope);
90 FiniteDuration timeout = duration("5 seconds");
91 FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
92 Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
94 reply(new LocalShardFound(getRef()));
96 RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
97 Assert.assertEquals("getPath", path, registerMsg.getPath());
98 Assert.assertEquals("getScope", scope, registerMsg.getScope());
99 Assert.assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances());
101 reply(new RegisterChangeListenerReply(getRef()));
103 for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
104 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
107 Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
108 proxy.getListenerRegistrationActor());
110 watch(proxy.getDataChangeListenerActor());
114 // The listener registration actor should get a Close message
115 expectMsgClass(timeout, CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
117 // The DataChangeListener actor should be terminated
118 expectMsgClass(timeout, Terminated.class);
127 public void testSuccessfulRegistrationForClusteredListener() {
128 new JavaTestKit(getSystem()) {{
129 ActorContext actorContext = new ActorContext(getSystem(), getRef(),
130 mock(ClusterWrapper.class), mock(Configuration.class));
132 AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> mockClusteredListener =
133 Mockito.mock(ClusteredDOMDataChangeListener.class);
135 final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
136 "shard-1", actorContext, mockClusteredListener);
138 final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
139 final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
143 proxy.init(path, scope);
148 FiniteDuration timeout = duration("5 seconds");
149 FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
150 Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
152 reply(new LocalShardFound(getRef()));
154 RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
155 Assert.assertEquals("getPath", path, registerMsg.getPath());
156 Assert.assertEquals("getScope", scope, registerMsg.getScope());
157 Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances());
159 reply(new RegisterChangeListenerReply(getRef()));
161 for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
162 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
165 Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
166 proxy.getListenerRegistrationActor());
168 watch(proxy.getDataChangeListenerActor());
172 // The listener registration actor should get a Close message
173 expectMsgClass(timeout, CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
175 // The DataChangeListener actor should be terminated
176 expectMsgClass(timeout, Terminated.class);
185 public void testLocalShardNotFound() {
186 new JavaTestKit(getSystem()) {{
187 ActorContext actorContext = new ActorContext(getSystem(), getRef(),
188 mock(ClusterWrapper.class), mock(Configuration.class));
190 final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
191 "shard-1", actorContext, mockListener);
193 final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
194 final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
198 proxy.init(path, scope);
203 FiniteDuration timeout = duration("5 seconds");
204 FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
205 Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
207 reply(new LocalShardNotFound("shard-1"));
209 expectNoMsg(duration("1 seconds"));
214 public void testLocalShardNotInitialized() {
215 new JavaTestKit(getSystem()) {{
216 ActorContext actorContext = new ActorContext(getSystem(), getRef(),
217 mock(ClusterWrapper.class), mock(Configuration.class));
219 final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
220 "shard-1", actorContext, mockListener);
222 final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
223 final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
227 proxy.init(path, scope);
232 FiniteDuration timeout = duration("5 seconds");
233 FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
234 Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
236 reply(new NotInitializedException("not initialized"));
238 new Within(duration("1 seconds")) {
240 protected void run() {
248 public void testFailedRegistration() {
249 new JavaTestKit(getSystem()) {{
250 ActorSystem mockActorSystem = mock(ActorSystem.class);
252 ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class),
253 "testFailedRegistration");
254 doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
255 ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(
256 MoreExecutors.sameThreadExecutor());
259 ActorContext actorContext = mock(ActorContext.class);
261 doReturn(executor).when(actorContext).getClientDispatcher();
263 String shardName = "shard-1";
264 final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
265 shardName, actorContext, mockListener);
267 doReturn(mockActorSystem).when(actorContext).getActorSystem();
268 doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
269 doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
270 doReturn(Futures.failed(new RuntimeException("mock"))).
271 when(actorContext).executeOperationAsync(any(ActorRef.class),
272 any(Object.class), any(Timeout.class));
273 doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext();
275 proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
276 AsyncDataBroker.DataChangeScope.ONE);
278 Assert.assertEquals("getListenerRegistrationActor", null,
279 proxy.getListenerRegistrationActor());
284 public void testCloseBeforeRegistration() {
285 new JavaTestKit(getSystem()) {{
286 ActorContext actorContext = mock(ActorContext.class);
288 String shardName = "shard-1";
289 final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
290 shardName, actorContext, mockListener);
292 doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
293 doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
294 doReturn(getSystem()).when(actorContext).getActorSystem();
295 doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorContext).getNotificationDispatcherPath();
296 doReturn(getSystem().actorSelection(getRef().path())).
297 when(actorContext).actorSelection(getRef().path());
298 doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
299 doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
301 Answer<Future<Object>> answer = new Answer<Future<Object>>() {
303 public Future<Object> answer(InvocationOnMock invocation) {
305 return Futures.successful((Object)new RegisterChangeListenerReply(getRef()));
309 doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class),
310 any(Object.class), any(Timeout.class));
312 proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
313 AsyncDataBroker.DataChangeScope.ONE);
315 expectMsgClass(duration("5 seconds"), CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
317 Assert.assertEquals("getListenerRegistrationActor", null,
318 proxy.getListenerRegistrationActor());