X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDataChangeListenerRegistrationProxyTest.java;h=58aec30a8470035bfd7111c5b4c5badebf26b401;hp=aaf080bdf7d8d50de4a3f31713143389994872a6;hb=2c18b1c5c2f37e620883f308b2a1271616889add;hpb=9fe3fcabfe237c216028a44f508855ff0ff495a2 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java index aaf080bdf7..58aec30a84 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java @@ -1,108 +1,256 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.controller.cluster.datastore; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; import akka.actor.ActorRef; +import akka.actor.ActorSystem; import akka.actor.Props; -import junit.framework.Assert; +import akka.actor.Terminated; +import akka.dispatch.ExecutionContexts; +import akka.dispatch.Futures; +import akka.testkit.JavaTestKit; +import akka.util.Timeout; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.concurrent.TimeUnit; +import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; +import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; +import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; +import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; +import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; +import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; -import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; -import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; -import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import scala.concurrent.ExecutionContextExecutor; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; -import java.util.List; +/** + * Unit tests for DataChangeListenerRegistrationProxy. + * + * @author Thomas Pantelis + */ +public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { -import static junit.framework.TestCase.assertEquals; -import static junit.framework.TestCase.assertNotNull; -import static junit.framework.TestCase.assertTrue; + @SuppressWarnings("unchecked") + private final AsyncDataChangeListener> mockListener = + Mockito.mock(AsyncDataChangeListener.class); -public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{ + @Test + public void testGetInstance() throws Exception { + DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( + "shard", Mockito.mock(ActorContext.class), mockListener); + + Assert.assertEquals(mockListener, proxy.getInstance()); + } + + @Test(timeout=10000) + public void testSuccessfulRegistration() { + new JavaTestKit(getSystem()) {{ + ActorContext actorContext = new ActorContext(getSystem(), getRef(), + mock(ClusterWrapper.class), mock(Configuration.class)); + + final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( + "shard-1", actorContext, mockListener); + + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE; + new Thread() { + @Override + public void run() { + proxy.init(path, scope); + } + + }.start(); + + FiniteDuration timeout = duration("5 seconds"); + FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); + Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); + + reply(new LocalShardFound(getRef())); + + RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class); + Assert.assertEquals("getPath", path, registerMsg.getPath()); + Assert.assertEquals("getScope", scope, registerMsg.getScope()); + + reply(new RegisterChangeListenerReply(getRef().path())); + + for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) { + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()), + proxy.getListenerRegistrationActor()); + + watch(proxy.getDataChangeListenerActor()); - private ActorRef dataChangeListenerActor = getSystem().actorOf(Props.create(DoNothingActor.class)); + proxy.close(); - private static class MockDataChangeListener implements - AsyncDataChangeListener> { + // The listener registration actor should get a Close message + expectMsgClass(timeout, CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS); - @Override public void onDataChanged( - AsyncDataChangeEvent> change) { - throw new UnsupportedOperationException("onDataChanged"); - } + // The DataChangeListener actor should be terminated + expectMsgClass(timeout, Terminated.class); + + proxy.close(); + + expectNoMsg(); + }}; } - @Test - public void testGetInstance() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + @Test(timeout=10000) + public void testLocalShardNotFound() { + new JavaTestKit(getSystem()) {{ + ActorContext actorContext = new ActorContext(getSystem(), getRef(), + mock(ClusterWrapper.class), mock(Configuration.class)); - MockDataChangeListener listener = - new MockDataChangeListener(); - DataChangeListenerRegistrationProxy proxy = - new DataChangeListenerRegistrationProxy( - getSystem().actorSelection(actorRef.path()), - listener, dataChangeListenerActor); + final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( + "shard-1", actorContext, mockListener); - Assert.assertEquals(listener, proxy.getInstance()); + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE; + new Thread() { + @Override + public void run() { + proxy.init(path, scope); + } + }.start(); + + FiniteDuration timeout = duration("5 seconds"); + FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); + Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); + + reply(new LocalShardNotFound("shard-1")); + + expectNoMsg(duration("1 seconds")); + }}; } - @Test - public void testClose() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + @Test(timeout=10000) + public void testLocalShardNotInitialized() { + new JavaTestKit(getSystem()) {{ + ActorContext actorContext = new ActorContext(getSystem(), getRef(), + mock(ClusterWrapper.class), mock(Configuration.class)); - DataChangeListenerRegistrationProxy proxy = - new DataChangeListenerRegistrationProxy( - getSystem().actorSelection(actorRef.path()), - new MockDataChangeListener(), dataChangeListenerActor); + final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( + "shard-1", actorContext, mockListener); - proxy.close(); + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE; + new Thread() { + @Override + public void run() { + proxy.init(path, scope); + } + + }.start(); + + FiniteDuration timeout = duration("5 seconds"); + FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); + Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); + + reply(new ActorNotInitialized()); + + new Within(duration("1 seconds")) { + @Override + protected void run() { + expectNoMsg(); + } + }; + }}; + } + + @Test + public void testFailedRegistration() { + new JavaTestKit(getSystem()) {{ + ActorSystem mockActorSystem = mock(ActorSystem.class); - //Check if it was received by the remote actor - ActorContext - testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration()); - Object messages = testContext - .executeOperation(actorRef, "messages"); + ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class), + "testFailedRegistration"); + doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class)); + ExecutionContextExecutor executor = ExecutionContexts.fromExecutor( + MoreExecutors.sameThreadExecutor()); + doReturn(executor).when(mockActorSystem).dispatcher(); - assertNotNull(messages); + ActorContext actorContext = mock(ActorContext.class); - assertTrue(messages instanceof List); + String shardName = "shard-1"; + final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( + shardName, actorContext, mockListener); - List listMessages = (List) messages; + doReturn(mockActorSystem).when(actorContext).getActorSystem(); + doReturn(duration("5 seconds")).when(actorContext).getOperationDuration(); + doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName)); + doReturn(Futures.failed(new RuntimeException("mock"))). + when(actorContext).executeOperationAsync(any(ActorRef.class), + any(Object.class), any(Timeout.class)); + doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext(); - assertEquals(1, listMessages.size()); + proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME), + AsyncDataBroker.DataChangeScope.ONE); - assertTrue(listMessages.get(0).getClass() - .equals(CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS)); + Assert.assertEquals("getListenerRegistrationActor", null, + proxy.getListenerRegistrationActor()); + }}; } @Test - public void testCloseWhenRegistrationIsNull() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + public void testCloseBeforeRegistration() { + new JavaTestKit(getSystem()) {{ + ActorContext actorContext = mock(ActorContext.class); - DataChangeListenerRegistrationProxy proxy = - new DataChangeListenerRegistrationProxy( - new MockDataChangeListener(), dataChangeListenerActor); + String shardName = "shard-1"; + final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( + shardName, actorContext, mockListener); - proxy.close(); + doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext(); + doReturn(getSystem()).when(actorContext).getActorSystem(); + doReturn(getSystem().actorSelection(getRef().path())). + when(actorContext).actorSelection(getRef().path()); + doReturn(duration("5 seconds")).when(actorContext).getOperationDuration(); + doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName)); - //Check if it was received by the remote actor - ActorContext - testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration()); - Object messages = testContext - .executeOperation(actorRef, "messages"); + Answer> answer = new Answer>() { + @Override + public Future answer(InvocationOnMock invocation) { + proxy.close(); + return Futures.successful((Object)new RegisterChangeListenerReply(getRef().path())); + } + }; - assertNotNull(messages); + doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class), + any(Object.class), any(Timeout.class)); - assertTrue(messages instanceof List); + proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME), + AsyncDataBroker.DataChangeScope.ONE); - List listMessages = (List) messages; + expectMsgClass(duration("5 seconds"), CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS); - assertEquals(0, listMessages.size()); + Assert.assertEquals("getListenerRegistrationActor", null, + proxy.getListenerRegistrationActor()); + }}; } }