Bump upstreams
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DataTreeChangeListenerProxyTest.java
index 373d4d7188b155a6e6da9ba0a861cc7cf576da1f..b0d38fba4750567b074aaff1ac56e4b7a483a3f1 100644 (file)
@@ -9,10 +9,11 @@ package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
@@ -27,9 +28,14 @@ import akka.util.Timeout;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.time.Duration;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.junit.Test;
-import org.mockito.stubbing.Answer;
+import org.mockito.ArgumentCaptor;
 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
@@ -42,37 +48,30 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNo
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.Future;
 
 public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
     private final DOMDataTreeChangeListener mockListener = mock(DOMDataTreeChangeListener.class);
 
     @Test(timeout = 10000)
     public void testSuccessfulRegistration() {
-        final TestKit kit = new TestKit(getSystem());
-        ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
+        final var kit = new TestKit(getSystem());
+        final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
             mock(Configuration.class));
 
-        final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
-        final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
-                actorUtils, mockListener, path);
+        final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+        final var proxy = startProxyAsync(actorUtils, path, false);
 
-        new Thread(() -> proxy.init("shard-1")).start();
-
-        Duration timeout = Duration.ofSeconds(5);
-        FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
-        assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+        final var timeout = Duration.ofSeconds(5);
+        final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
+        assertEquals("shard-1", findLocalShard.getShardName());
 
         kit.reply(new LocalShardFound(kit.getRef()));
 
-        RegisterDataTreeChangeListener registerMsg = kit.expectMsgClass(timeout,
-            RegisterDataTreeChangeListener.class);
-        assertEquals("getPath", path, registerMsg.getPath());
-        assertFalse("isRegisterOnAllInstances", registerMsg.isRegisterOnAllInstances());
+        final var registerMsg = kit.expectMsgClass(timeout, RegisterDataTreeChangeListener.class);
+        assertEquals(path, registerMsg.getPath());
+        assertFalse(registerMsg.isRegisterOnAllInstances());
 
         kit.reply(new RegisterDataTreeNotificationListenerReply(kit.getRef()));
 
@@ -80,8 +79,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
         }
 
-        assertEquals("getListenerRegistrationActor", getSystem().actorSelection(kit.getRef().path()),
-            proxy.getListenerRegistrationActor());
+        assertEquals(getSystem().actorSelection(kit.getRef().path()), proxy.getListenerRegistrationActor());
 
         kit.watch(proxy.getDataChangeListenerActor());
 
@@ -100,48 +98,38 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
 
     @Test(timeout = 10000)
     public void testSuccessfulRegistrationForClusteredListener() {
-        final TestKit kit = new TestKit(getSystem());
-        ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
+        final var kit = new TestKit(getSystem());
+        final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
             mock(Configuration.class));
 
-        ClusteredDOMDataTreeChangeListener mockClusteredListener = mock(
-            ClusteredDOMDataTreeChangeListener.class);
-
-        final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
-        final DataTreeChangeListenerProxy<ClusteredDOMDataTreeChangeListener> proxy =
-                new DataTreeChangeListenerProxy<>(actorUtils, mockClusteredListener, path);
+        final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+        final var proxy = startProxyAsync(actorUtils, path, true);
 
-        new Thread(() -> proxy.init("shard-1")).start();
-
-        Duration timeout = Duration.ofSeconds(5);
-        FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
-        assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+        final var timeout = Duration.ofSeconds(5);
+        final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
+        assertEquals("shard-1", findLocalShard.getShardName());
 
         kit.reply(new LocalShardFound(kit.getRef()));
 
-        RegisterDataTreeChangeListener registerMsg = kit.expectMsgClass(timeout,
-            RegisterDataTreeChangeListener.class);
-        assertEquals("getPath", path, registerMsg.getPath());
-        assertTrue("isRegisterOnAllInstances", registerMsg.isRegisterOnAllInstances());
+        final var registerMsg = kit.expectMsgClass(timeout, RegisterDataTreeChangeListener.class);
+        assertEquals(path, registerMsg.getPath());
+        assertTrue(registerMsg.isRegisterOnAllInstances());
 
         proxy.close();
     }
 
     @Test(timeout = 10000)
     public void testLocalShardNotFound() {
-        final TestKit kit = new TestKit(getSystem());
-        ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
+        final var kit = new TestKit(getSystem());
+        final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
             mock(Configuration.class));
 
-        final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
-        final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
-                actorUtils, mockListener, path);
-
-        new Thread(() -> proxy.init("shard-1")).start();
+        final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+        final var proxy = startProxyAsync(actorUtils, path, true);
 
-        Duration timeout = Duration.ofSeconds(5);
-        FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
-        assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+        final var timeout = Duration.ofSeconds(5);
+        final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
+        assertEquals("shard-1", findLocalShard.getShardName());
 
         kit.reply(new LocalShardNotFound("shard-1"));
 
@@ -152,19 +140,16 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
 
     @Test(timeout = 10000)
     public void testLocalShardNotInitialized() {
-        final TestKit kit = new TestKit(getSystem());
-        ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
+        final var kit = new TestKit(getSystem());
+        final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
             mock(Configuration.class));
 
-        final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
-        final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
-                actorUtils, mockListener, path);
+        final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+        final var proxy = startProxyAsync(actorUtils, path, false);
 
-        new Thread(() -> proxy.init("shard-1")).start();
-
-        Duration timeout = Duration.ofSeconds(5);
-        FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
-        assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+        final var timeout = Duration.ofSeconds(5);
+        final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
+        assertEquals("shard-1", findLocalShard.getShardName());
 
         kit.reply(new NotInitializedException("not initialized"));
 
@@ -178,43 +163,35 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
 
     @Test
     public void testFailedRegistration() {
-        final TestKit kit = new TestKit(getSystem());
-        ActorSystem mockActorSystem = mock(ActorSystem.class);
+        final var kit = new TestKit(getSystem());
+        final var mockActorSystem = mock(ActorSystem.class);
 
-        ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class), "testFailedRegistration");
+        final var mockActor = getSystem().actorOf(Props.create(DoNothingActor.class), "testFailedRegistration");
         doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
-        ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.directExecutor());
+        final var executor = ExecutionContexts.fromExecutor(MoreExecutors.directExecutor());
 
-        ActorUtils actorUtils = mock(ActorUtils.class);
-        final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+        final var actorUtils = mock(ActorUtils.class);
+        final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
 
         doReturn(executor).when(actorUtils).getClientDispatcher();
         doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
         doReturn(mockActorSystem).when(actorUtils).getActorSystem();
 
-        String shardName = "shard-1";
-        final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
-                actorUtils, mockListener, path);
-
         doReturn(kit.duration("5 seconds")).when(actorUtils).getOperationDuration();
-        doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync(eq(shardName));
+        doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync("shard-1");
         doReturn(Futures.failed(new RuntimeException("mock"))).when(actorUtils).executeOperationAsync(
             any(ActorRef.class), any(Object.class), any(Timeout.class));
-        doReturn(mock(DatastoreContext.class)).when(actorUtils).getDatastoreContext();
-
-        proxy.init("shard-1");
 
-        assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
+        final var proxy = DataTreeChangeListenerProxy.of(actorUtils, mockListener, path, true, "shard-1");
+        assertNull(proxy.getListenerRegistrationActor());
 
         proxy.close();
     }
 
     @Test
     public void testCloseBeforeRegistration() {
-        final TestKit kit = new TestKit(getSystem());
-        ActorUtils actorUtils = mock(ActorUtils.class);
-
-        String shardName = "shard-1";
+        final var kit = new TestKit(getSystem());
+        final var actorUtils = mock(ActorUtils.class);
 
         doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorUtils).getClientDispatcher();
@@ -223,23 +200,46 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
         doReturn(getSystem().actorSelection(kit.getRef().path())).when(actorUtils).actorSelection(
             kit.getRef().path());
         doReturn(kit.duration("5 seconds")).when(actorUtils).getOperationDuration();
-        doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync(eq(shardName));
+        doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync("shard-1");
 
-        final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
-                actorUtils, mockListener, YangInstanceIdentifier.of(TestModel.TEST_QNAME));
+        final var proxy = createProxy(actorUtils, YangInstanceIdentifier.of(TestModel.TEST_QNAME), true);
+        final var instance = proxy.getKey();
 
-        Answer<Future<Object>> answer = invocation -> {
-            proxy.close();
-            return Futures.successful((Object) new RegisterDataTreeNotificationListenerReply(kit.getRef()));
-        };
+        doAnswer(invocation -> {
+            instance.close();
+            return Futures.successful(new RegisterDataTreeNotificationListenerReply(kit.getRef()));
+        }).when(actorUtils).executeOperationAsync(any(ActorRef.class), any(Object.class), any(Timeout.class));
+        proxy.getValue().run();
 
-        doAnswer(answer).when(actorUtils).executeOperationAsync(any(ActorRef.class), any(Object.class),
-            any(Timeout.class));
+        kit.expectMsgClass(Duration.ofSeconds(5), CloseDataTreeNotificationListenerRegistration.class);
 
-        proxy.init(shardName);
+        assertNull(instance.getListenerRegistrationActor());
+    }
 
-        kit.expectMsgClass(Duration.ofSeconds(5), CloseDataTreeNotificationListenerRegistration.class);
+    @NonNullByDefault
+    private DataTreeChangeListenerProxy startProxyAsync(final ActorUtils actorUtils, final YangInstanceIdentifier path,
+            final boolean clustered) {
+        return startProxyAsync(actorUtils, path, clustered, Runnable::run);
+    }
+
+    @NonNullByDefault
+    private DataTreeChangeListenerProxy startProxyAsync(final ActorUtils actorUtils, final YangInstanceIdentifier path,
+            final boolean clustered, final Consumer<Runnable> execute) {
+        final var proxy = createProxy(actorUtils, path, clustered);
+        final var thread = new Thread(proxy.getValue());
+        thread.setDaemon(true);
+        thread.start();
+        return proxy.getKey();
+    }
 
-        assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
+    @NonNullByDefault
+    private Entry<DataTreeChangeListenerProxy, Runnable> createProxy(final ActorUtils actorUtils,
+            final YangInstanceIdentifier path, final boolean clustered) {
+        final var executor = mock(Executor.class);
+        final var captor = ArgumentCaptor.forClass(Runnable.class);
+        doNothing().when(executor).execute(captor.capture());
+        final var proxy = DataTreeChangeListenerProxy.ofTesting(actorUtils, mockListener, path, clustered, "shard-1",
+            executor);
+        return Map.entry(proxy, captor.getValue());
     }
 }