X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDataTreeChangeListenerProxyTest.java;fp=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDataTreeChangeListenerProxyTest.java;h=b0d38fba4750567b074aaff1ac56e4b7a483a3f1;hp=373d4d7188b155a6e6da9ba0a861cc7cf576da1f;hb=7526de25301597d670657400b541b10455311fbe;hpb=9917911b1a492b5f9fbeef1591569f7fc4a80f68 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java index 373d4d7188..b0d38fba47 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java @@ -9,10 +9,11 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -27,9 +28,14 @@ import akka.util.Timeout; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import java.time.Duration; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import org.eclipse.jdt.annotation.NonNullByDefault; import org.junit.Test; -import org.mockito.stubbing.Answer; +import org.mockito.ArgumentCaptor; import org.opendaylight.controller.cluster.common.actor.Dispatchers; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; @@ -42,37 +48,30 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNo import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener; import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import scala.concurrent.ExecutionContextExecutor; -import scala.concurrent.Future; public class DataTreeChangeListenerProxyTest extends AbstractActorTest { private final DOMDataTreeChangeListener mockListener = mock(DOMDataTreeChangeListener.class); @Test(timeout = 10000) public void testSuccessfulRegistration() { - final TestKit kit = new TestKit(getSystem()); - ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class), + final var kit = new TestKit(getSystem()); + final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class), mock(Configuration.class)); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); - final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( - actorUtils, mockListener, path); + final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final var proxy = startProxyAsync(actorUtils, path, false); - new Thread(() -> proxy.init("shard-1")).start(); - - Duration timeout = Duration.ofSeconds(5); - FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class); - assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); + final var timeout = Duration.ofSeconds(5); + final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class); + assertEquals("shard-1", findLocalShard.getShardName()); kit.reply(new LocalShardFound(kit.getRef())); - RegisterDataTreeChangeListener registerMsg = kit.expectMsgClass(timeout, - RegisterDataTreeChangeListener.class); - assertEquals("getPath", path, registerMsg.getPath()); - assertFalse("isRegisterOnAllInstances", registerMsg.isRegisterOnAllInstances()); + final var registerMsg = kit.expectMsgClass(timeout, RegisterDataTreeChangeListener.class); + assertEquals(path, registerMsg.getPath()); + assertFalse(registerMsg.isRegisterOnAllInstances()); kit.reply(new RegisterDataTreeNotificationListenerReply(kit.getRef())); @@ -80,8 +79,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); } - assertEquals("getListenerRegistrationActor", getSystem().actorSelection(kit.getRef().path()), - proxy.getListenerRegistrationActor()); + assertEquals(getSystem().actorSelection(kit.getRef().path()), proxy.getListenerRegistrationActor()); kit.watch(proxy.getDataChangeListenerActor()); @@ -100,48 +98,38 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { @Test(timeout = 10000) public void testSuccessfulRegistrationForClusteredListener() { - final TestKit kit = new TestKit(getSystem()); - ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class), + final var kit = new TestKit(getSystem()); + final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class), mock(Configuration.class)); - ClusteredDOMDataTreeChangeListener mockClusteredListener = mock( - ClusteredDOMDataTreeChangeListener.class); - - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); - final DataTreeChangeListenerProxy proxy = - new DataTreeChangeListenerProxy<>(actorUtils, mockClusteredListener, path); + final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final var proxy = startProxyAsync(actorUtils, path, true); - new Thread(() -> proxy.init("shard-1")).start(); - - Duration timeout = Duration.ofSeconds(5); - FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class); - assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); + final var timeout = Duration.ofSeconds(5); + final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class); + assertEquals("shard-1", findLocalShard.getShardName()); kit.reply(new LocalShardFound(kit.getRef())); - RegisterDataTreeChangeListener registerMsg = kit.expectMsgClass(timeout, - RegisterDataTreeChangeListener.class); - assertEquals("getPath", path, registerMsg.getPath()); - assertTrue("isRegisterOnAllInstances", registerMsg.isRegisterOnAllInstances()); + final var registerMsg = kit.expectMsgClass(timeout, RegisterDataTreeChangeListener.class); + assertEquals(path, registerMsg.getPath()); + assertTrue(registerMsg.isRegisterOnAllInstances()); proxy.close(); } @Test(timeout = 10000) public void testLocalShardNotFound() { - final TestKit kit = new TestKit(getSystem()); - ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class), + final var kit = new TestKit(getSystem()); + final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class), mock(Configuration.class)); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); - final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( - actorUtils, mockListener, path); - - new Thread(() -> proxy.init("shard-1")).start(); + final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final var proxy = startProxyAsync(actorUtils, path, true); - Duration timeout = Duration.ofSeconds(5); - FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class); - assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); + final var timeout = Duration.ofSeconds(5); + final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class); + assertEquals("shard-1", findLocalShard.getShardName()); kit.reply(new LocalShardNotFound("shard-1")); @@ -152,19 +140,16 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { @Test(timeout = 10000) public void testLocalShardNotInitialized() { - final TestKit kit = new TestKit(getSystem()); - ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class), + final var kit = new TestKit(getSystem()); + final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class), mock(Configuration.class)); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); - final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( - actorUtils, mockListener, path); + final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final var proxy = startProxyAsync(actorUtils, path, false); - new Thread(() -> proxy.init("shard-1")).start(); - - Duration timeout = Duration.ofSeconds(5); - FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class); - assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); + final var timeout = Duration.ofSeconds(5); + final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class); + assertEquals("shard-1", findLocalShard.getShardName()); kit.reply(new NotInitializedException("not initialized")); @@ -178,43 +163,35 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { @Test public void testFailedRegistration() { - final TestKit kit = new TestKit(getSystem()); - ActorSystem mockActorSystem = mock(ActorSystem.class); + final var kit = new TestKit(getSystem()); + final var mockActorSystem = mock(ActorSystem.class); - ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class), "testFailedRegistration"); + final var mockActor = getSystem().actorOf(Props.create(DoNothingActor.class), "testFailedRegistration"); doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class)); - ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.directExecutor()); + final var executor = ExecutionContexts.fromExecutor(MoreExecutors.directExecutor()); - ActorUtils actorUtils = mock(ActorUtils.class); - final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + final var actorUtils = mock(ActorUtils.class); + final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); doReturn(executor).when(actorUtils).getClientDispatcher(); doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext(); doReturn(mockActorSystem).when(actorUtils).getActorSystem(); - String shardName = "shard-1"; - final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( - actorUtils, mockListener, path); - doReturn(kit.duration("5 seconds")).when(actorUtils).getOperationDuration(); - doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync(eq(shardName)); + doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync("shard-1"); doReturn(Futures.failed(new RuntimeException("mock"))).when(actorUtils).executeOperationAsync( any(ActorRef.class), any(Object.class), any(Timeout.class)); - doReturn(mock(DatastoreContext.class)).when(actorUtils).getDatastoreContext(); - - proxy.init("shard-1"); - assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor()); + final var proxy = DataTreeChangeListenerProxy.of(actorUtils, mockListener, path, true, "shard-1"); + assertNull(proxy.getListenerRegistrationActor()); proxy.close(); } @Test public void testCloseBeforeRegistration() { - final TestKit kit = new TestKit(getSystem()); - ActorUtils actorUtils = mock(ActorUtils.class); - - String shardName = "shard-1"; + final var kit = new TestKit(getSystem()); + final var actorUtils = mock(ActorUtils.class); doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext(); doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorUtils).getClientDispatcher(); @@ -223,23 +200,46 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { doReturn(getSystem().actorSelection(kit.getRef().path())).when(actorUtils).actorSelection( kit.getRef().path()); doReturn(kit.duration("5 seconds")).when(actorUtils).getOperationDuration(); - doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync(eq(shardName)); + doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync("shard-1"); - final DataTreeChangeListenerProxy proxy = new DataTreeChangeListenerProxy<>( - actorUtils, mockListener, YangInstanceIdentifier.of(TestModel.TEST_QNAME)); + final var proxy = createProxy(actorUtils, YangInstanceIdentifier.of(TestModel.TEST_QNAME), true); + final var instance = proxy.getKey(); - Answer> answer = invocation -> { - proxy.close(); - return Futures.successful((Object) new RegisterDataTreeNotificationListenerReply(kit.getRef())); - }; + doAnswer(invocation -> { + instance.close(); + return Futures.successful(new RegisterDataTreeNotificationListenerReply(kit.getRef())); + }).when(actorUtils).executeOperationAsync(any(ActorRef.class), any(Object.class), any(Timeout.class)); + proxy.getValue().run(); - doAnswer(answer).when(actorUtils).executeOperationAsync(any(ActorRef.class), any(Object.class), - any(Timeout.class)); + kit.expectMsgClass(Duration.ofSeconds(5), CloseDataTreeNotificationListenerRegistration.class); - proxy.init(shardName); + assertNull(instance.getListenerRegistrationActor()); + } - kit.expectMsgClass(Duration.ofSeconds(5), CloseDataTreeNotificationListenerRegistration.class); + @NonNullByDefault + private DataTreeChangeListenerProxy startProxyAsync(final ActorUtils actorUtils, final YangInstanceIdentifier path, + final boolean clustered) { + return startProxyAsync(actorUtils, path, clustered, Runnable::run); + } + + @NonNullByDefault + private DataTreeChangeListenerProxy startProxyAsync(final ActorUtils actorUtils, final YangInstanceIdentifier path, + final boolean clustered, final Consumer execute) { + final var proxy = createProxy(actorUtils, path, clustered); + final var thread = new Thread(proxy.getValue()); + thread.setDaemon(true); + thread.start(); + return proxy.getKey(); + } - assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor()); + @NonNullByDefault + private Entry createProxy(final ActorUtils actorUtils, + final YangInstanceIdentifier path, final boolean clustered) { + final var executor = mock(Executor.class); + final var captor = ArgumentCaptor.forClass(Runnable.class); + doNothing().when(executor).execute(captor.capture()); + final var proxy = DataTreeChangeListenerProxy.ofTesting(actorUtils, mockListener, path, clustered, "shard-1", + executor); + return Map.entry(proxy, captor.getValue()); } }