X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDataChangeListenerRegistrationProxyTest.java;h=26b9aa3e94d7c94b8fc92036cbb05fa02b7b68c2;hb=f41c5e6e6f6e10b36b1e4b1992877e38e718c8fb;hp=b5567fa1d59b247d2f28c4804989e0c6b95a495b;hpb=4aafd13eccad285b5bb3ee277b0fb0b8721612bc;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java index b5567fa1d5..26b9aa3e94 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java @@ -12,13 +12,14 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; + import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.Terminated; import akka.dispatch.ExecutionContexts; import akka.dispatch.Futures; -import akka.testkit.JavaTestKit; +import akka.testkit.javadsl.TestKit; import akka.util.Timeout; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; @@ -26,18 +27,17 @@ import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.opendaylight.controller.cluster.common.actor.Dispatchers; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; -import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; -import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; @@ -63,260 +63,244 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest { @Test public void testGetInstance() throws Exception { - DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( - "shard", Mockito.mock(ActorContext.class), mockListener); - - Assert.assertEquals(mockListener, proxy.getInstance()); + try (DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( + "shard", Mockito.mock(ActorContext.class), mockListener)) { + Assert.assertEquals(mockListener, proxy.getInstance()); + } } - @Test(timeout=10000) + @Test(timeout = 10000) public void testSuccessfulRegistration() { - new JavaTestKit(getSystem()) {{ - ActorContext actorContext = new ActorContext(getSystem(), getRef(), - mock(ClusterWrapper.class), mock(Configuration.class)); + new TestKit(getSystem()) { + { + ActorContext actorContext = new ActorContext(getSystem(), getRef(), + mock(ClusterWrapper.class), mock(Configuration.class)); - final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( - "shard-1", actorContext, mockListener); + final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( + "shard-1", actorContext, mockListener); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); - final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE; - new Thread() { - @Override - public void run() { - proxy.init(path, scope); - } + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE; + new Thread(() -> proxy.init(path, scope)).start(); - }.start(); + FiniteDuration timeout = duration("5 seconds"); + FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); + Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); - FiniteDuration timeout = duration("5 seconds"); - FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); - Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); + reply(new LocalShardFound(getRef())); - reply(new LocalShardFound(getRef())); + RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class); + Assert.assertEquals("getPath", path, registerMsg.getPath()); + Assert.assertEquals("getScope", scope, registerMsg.getScope()); + Assert.assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances()); - RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class); - Assert.assertEquals("getPath", path, registerMsg.getPath()); - Assert.assertEquals("getScope", scope, registerMsg.getScope()); - Assert.assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances()); + reply(new RegisterDataTreeNotificationListenerReply(getRef())); - reply(new RegisterChangeListenerReply(getRef())); - - for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) { - Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); - } + for (int i = 0; i < 20 * 5 && proxy.getListenerRegistrationActor() == null; i++) { + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } - Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()), - proxy.getListenerRegistrationActor()); + Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()), + proxy.getListenerRegistrationActor()); - watch(proxy.getDataChangeListenerActor()); + watch(proxy.getDataChangeListenerActor()); - proxy.close(); + proxy.close(); - // The listener registration actor should get a Close message - expectMsgClass(timeout, CloseDataChangeListenerRegistration.class); + // The listener registration actor should get a Close message + expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class); - // The DataChangeListener actor should be terminated - expectMsgClass(timeout, Terminated.class); + // The DataChangeListener actor should be terminated + expectMsgClass(timeout, Terminated.class); - proxy.close(); + proxy.close(); - expectNoMsg(); - }}; + expectNoMsg(); + } + }; } - @Test(timeout=10000) + @Test(timeout = 10000) public void testSuccessfulRegistrationForClusteredListener() { - new JavaTestKit(getSystem()) {{ - ActorContext actorContext = new ActorContext(getSystem(), getRef(), - mock(ClusterWrapper.class), mock(Configuration.class)); - - AsyncDataChangeListener> mockClusteredListener = - Mockito.mock(ClusteredDOMDataChangeListener.class); - - final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( - "shard-1", actorContext, mockClusteredListener); - - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); - final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE; - new Thread() { - @Override - public void run() { - proxy.init(path, scope); - } + new TestKit(getSystem()) { + { + ActorContext actorContext = new ActorContext(getSystem(), getRef(), + mock(ClusterWrapper.class), mock(Configuration.class)); - }.start(); + AsyncDataChangeListener> mockClusteredListener = + Mockito.mock(ClusteredDOMDataChangeListener.class); - FiniteDuration timeout = duration("5 seconds"); - FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); - Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); + final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( + "shard-1", actorContext, mockClusteredListener); - reply(new LocalShardFound(getRef())); + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE; + new Thread(() -> proxy.init(path, scope)).start(); - RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class); - Assert.assertEquals("getPath", path, registerMsg.getPath()); - Assert.assertEquals("getScope", scope, registerMsg.getScope()); - Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances()); + FiniteDuration timeout = duration("5 seconds"); + FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); + Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); - reply(new RegisterChangeListenerReply(getRef())); + reply(new LocalShardFound(getRef())); - for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) { - Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); - } + RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class); + Assert.assertEquals("getPath", path, registerMsg.getPath()); + Assert.assertEquals("getScope", scope, registerMsg.getScope()); + Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances()); + + reply(new RegisterDataTreeNotificationListenerReply(getRef())); + + for (int i = 0; i < 20 * 5 && proxy.getListenerRegistrationActor() == null; i++) { + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } - Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()), - proxy.getListenerRegistrationActor()); + Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()), + proxy.getListenerRegistrationActor()); - watch(proxy.getDataChangeListenerActor()); + watch(proxy.getDataChangeListenerActor()); - proxy.close(); + proxy.close(); - // The listener registration actor should get a Close message - expectMsgClass(timeout, CloseDataChangeListenerRegistration.class); + // The listener registration actor should get a Close message + expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class); - // The DataChangeListener actor should be terminated - expectMsgClass(timeout, Terminated.class); + // The DataChangeListener actor should be terminated + expectMsgClass(timeout, Terminated.class); - proxy.close(); + proxy.close(); - expectNoMsg(); - }}; + expectNoMsg(); + } + }; } - @Test(timeout=10000) + @Test(timeout = 10000) public void testLocalShardNotFound() { - new JavaTestKit(getSystem()) {{ - ActorContext actorContext = new ActorContext(getSystem(), getRef(), - mock(ClusterWrapper.class), mock(Configuration.class)); + new TestKit(getSystem()) { + { + ActorContext actorContext = new ActorContext(getSystem(), getRef(), + mock(ClusterWrapper.class), mock(Configuration.class)); - final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( - "shard-1", actorContext, mockListener); + final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( + "shard-1", actorContext, mockListener); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); - final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE; - new Thread() { - @Override - public void run() { - proxy.init(path, scope); - } + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE; + new Thread(() -> proxy.init(path, scope)).start(); - }.start(); + FiniteDuration timeout = duration("5 seconds"); + FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); + Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); - FiniteDuration timeout = duration("5 seconds"); - FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); - Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); + reply(new LocalShardNotFound("shard-1")); - reply(new LocalShardNotFound("shard-1")); + expectNoMsg(duration("1 seconds")); - expectNoMsg(duration("1 seconds")); - }}; + proxy.close(); + } + }; } - @Test(timeout=10000) + @Test(timeout = 10000) public void testLocalShardNotInitialized() { - new JavaTestKit(getSystem()) {{ - ActorContext actorContext = new ActorContext(getSystem(), getRef(), - mock(ClusterWrapper.class), mock(Configuration.class)); - - final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( - "shard-1", actorContext, mockListener); + new TestKit(getSystem()) { + { + ActorContext actorContext = new ActorContext(getSystem(), getRef(), + mock(ClusterWrapper.class), mock(Configuration.class)); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); - final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE; - new Thread() { - @Override - public void run() { - proxy.init(path, scope); - } + final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( + "shard-1", actorContext, mockListener); - }.start(); + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE; + new Thread(() -> proxy.init(path, scope)).start(); - FiniteDuration timeout = duration("5 seconds"); - FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); - Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); + FiniteDuration timeout = duration("5 seconds"); + FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); + Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); - reply(new NotInitializedException("not initialized")); + reply(new NotInitializedException("not initialized")); - new Within(duration("1 seconds")) { - @Override - protected void run() { - expectNoMsg(); - } - }; - }}; + expectNoMsg(duration("1 seconds")); + proxy.close(); + } + }; } @Test public void testFailedRegistration() { - new JavaTestKit(getSystem()) {{ - ActorSystem mockActorSystem = mock(ActorSystem.class); + new TestKit(getSystem()) { + { + ActorSystem mockActorSystem = mock(ActorSystem.class); + + ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class), + "testFailedRegistration"); + doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class)); + ExecutionContextExecutor executor = ExecutionContexts.fromExecutor( + MoreExecutors.directExecutor()); - ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class), - "testFailedRegistration"); - doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class)); - ExecutionContextExecutor executor = ExecutionContexts.fromExecutor( - MoreExecutors.directExecutor()); + ActorContext actorContext = mock(ActorContext.class); - ActorContext actorContext = mock(ActorContext.class); + doReturn(executor).when(actorContext).getClientDispatcher(); - doReturn(executor).when(actorContext).getClientDispatcher(); + String shardName = "shard-1"; + final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( + shardName, actorContext, mockListener); - String shardName = "shard-1"; - final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( - shardName, actorContext, mockListener); + doReturn(mockActorSystem).when(actorContext).getActorSystem(); + doReturn(duration("5 seconds")).when(actorContext).getOperationDuration(); + doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName)); + doReturn(Futures.failed(new RuntimeException("mock"))) + .when(actorContext).executeOperationAsync(any(ActorRef.class), + any(Object.class), any(Timeout.class)); + doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext(); - doReturn(mockActorSystem).when(actorContext).getActorSystem(); - doReturn(duration("5 seconds")).when(actorContext).getOperationDuration(); - doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName)); - doReturn(Futures.failed(new RuntimeException("mock"))). - when(actorContext).executeOperationAsync(any(ActorRef.class), - any(Object.class), any(Timeout.class)); - doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext(); + proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME), + AsyncDataBroker.DataChangeScope.ONE); - proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME), - AsyncDataBroker.DataChangeScope.ONE); + Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor()); - Assert.assertEquals("getListenerRegistrationActor", null, - proxy.getListenerRegistrationActor()); - }}; + proxy.close(); + } + }; } @Test public void testCloseBeforeRegistration() { - new JavaTestKit(getSystem()) {{ - ActorContext actorContext = mock(ActorContext.class); - - String shardName = "shard-1"; - final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( - shardName, actorContext, mockListener); - - doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext(); - doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher(); - doReturn(getSystem()).when(actorContext).getActorSystem(); - doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorContext).getNotificationDispatcherPath(); - doReturn(getSystem().actorSelection(getRef().path())). - when(actorContext).actorSelection(getRef().path()); - doReturn(duration("5 seconds")).when(actorContext).getOperationDuration(); - doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName)); - - Answer> answer = new Answer>() { - @Override - public Future answer(InvocationOnMock invocation) { + new TestKit(getSystem()) { + { + ActorContext actorContext = mock(ActorContext.class); + + String shardName = "shard-1"; + final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( + shardName, actorContext, mockListener); + + doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext(); + doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher(); + doReturn(getSystem()).when(actorContext).getActorSystem(); + doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorContext).getNotificationDispatcherPath(); + doReturn(getSystem().actorSelection(getRef().path())) + .when(actorContext).actorSelection(getRef().path()); + doReturn(duration("5 seconds")).when(actorContext).getOperationDuration(); + doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName)); + + Answer> answer = invocation -> { proxy.close(); - return Futures.successful((Object)new RegisterChangeListenerReply(getRef())); - } - }; + return Futures.successful((Object)new RegisterDataTreeNotificationListenerReply(getRef())); + }; - doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class), - any(Object.class), any(Timeout.class)); + doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class), + any(Object.class), any(Timeout.class)); - proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME), - AsyncDataBroker.DataChangeScope.ONE); + proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME), + AsyncDataBroker.DataChangeScope.ONE); - expectMsgClass(duration("5 seconds"), CloseDataChangeListenerRegistration.class); + expectMsgClass(duration("5 seconds"), CloseDataTreeNotificationListenerRegistration.class); - Assert.assertEquals("getListenerRegistrationActor", null, - proxy.getListenerRegistrationActor()); - }}; + Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor()); + proxy.close(); + } + }; } }