X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDataChangeListenerRegistrationProxyTest.java;h=aa95e78d888ca1dfc7c0a0b699bfa26bfbea76dd;hp=8ad2e899aab5b5fa8ad7bf3e4a74c3ca66210964;hb=d594cf3be29ab746695eb5a3b0d220be89b57566;hpb=cccc148d3cd2b5a2a7c86da2e74f2ea43fa00b93 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java index 8ad2e899aa..aa95e78d88 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java @@ -7,10 +7,10 @@ */ package org.opendaylight.controller.cluster.datastore; -import static org.mockito.Mockito.any; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -28,7 +28,8 @@ import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; +import org.opendaylight.controller.cluster.datastore.config.Configuration; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; @@ -36,11 +37,13 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import scala.concurrent.ExecutionContextExecutor; @@ -94,8 +97,9 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class); Assert.assertEquals("getPath", path, registerMsg.getPath()); Assert.assertEquals("getScope", scope, registerMsg.getScope()); + Assert.assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances()); - reply(new RegisterChangeListenerReply(getRef().path())); + reply(new RegisterChangeListenerReply(getRef())); for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) { Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); @@ -120,6 +124,64 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { }}; } + @Test(timeout=10000) + public void testSuccessfulRegistrationForClusteredListener() { + new JavaTestKit(getSystem()) {{ + ActorContext actorContext = new ActorContext(getSystem(), getRef(), + mock(ClusterWrapper.class), mock(Configuration.class)); + + AsyncDataChangeListener> mockClusteredListener = + Mockito.mock(ClusteredDOMDataChangeListener.class); + + final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( + "shard-1", actorContext, mockClusteredListener); + + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE; + new Thread() { + @Override + public void run() { + proxy.init(path, scope); + } + + }.start(); + + FiniteDuration timeout = duration("5 seconds"); + FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); + Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); + + reply(new LocalShardFound(getRef())); + + RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class); + Assert.assertEquals("getPath", path, registerMsg.getPath()); + Assert.assertEquals("getScope", scope, registerMsg.getScope()); + Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances()); + + reply(new RegisterChangeListenerReply(getRef())); + + for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) { + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()), + proxy.getListenerRegistrationActor()); + + watch(proxy.getDataChangeListenerActor()); + + proxy.close(); + + // The listener registration actor should get a Close message + expectMsgClass(timeout, CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS); + + // The DataChangeListener actor should be terminated + expectMsgClass(timeout, Terminated.class); + + proxy.close(); + + expectNoMsg(); + }}; + } + @Test(timeout=10000) public void testLocalShardNotFound() { new JavaTestKit(getSystem()) {{ @@ -172,7 +234,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); - reply(new ActorNotInitialized()); + reply(new NotInitializedException("not initialized")); new Within(duration("1 seconds")) { @Override @@ -192,7 +254,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { "testFailedRegistration"); doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class)); ExecutionContextExecutor executor = ExecutionContexts.fromExecutor( - MoreExecutors.sameThreadExecutor()); + MoreExecutors.directExecutor()); ActorContext actorContext = mock(ActorContext.class); @@ -231,6 +293,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext(); doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher(); doReturn(getSystem()).when(actorContext).getActorSystem(); + doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorContext).getNotificationDispatcherPath(); doReturn(getSystem().actorSelection(getRef().path())). when(actorContext).actorSelection(getRef().path()); doReturn(duration("5 seconds")).when(actorContext).getOperationDuration(); @@ -240,7 +303,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { @Override public Future answer(InvocationOnMock invocation) { proxy.close(); - return Futures.successful((Object)new RegisterChangeListenerReply(getRef().path())); + return Futures.successful((Object)new RegisterChangeListenerReply(getRef())); } };