Update sal-distributed-datastore tests a bit
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / databroker / actors / dds / ModuleShardBackendResolverTest.java
index bedd4a9283ec4d4710559e7f971418021f2964b7..ebd96c974187682807d122ff5d2c5484c181fbab 100644 (file)
@@ -7,23 +7,32 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
+import akka.actor.Status;
 import akka.testkit.TestProbe;
 import akka.testkit.javadsl.TestKit;
-import java.util.Collections;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.List;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientFailure;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
@@ -33,14 +42,18 @@ import org.opendaylight.controller.cluster.access.concepts.FrontendType;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.shardmanager.RegisterForShardAvailabilityChanges;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import scala.concurrent.Promise;
 
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class ModuleShardBackendResolverTest {
 
     private static final MemberName MEMBER_NAME = MemberName.forName("member-1");
@@ -51,6 +64,7 @@ public class ModuleShardBackendResolverTest {
     private ActorSystem system;
     private ModuleShardBackendResolver moduleShardBackendResolver;
     private TestProbe contextProbe;
+    private TestProbe shardManagerProbe;
 
     @Mock
     private ShardStrategyFactory shardStrategyFactory;
@@ -61,15 +75,19 @@ public class ModuleShardBackendResolverTest {
 
     @Before
     public void setUp() {
-        MockitoAnnotations.initMocks(this);
         system = ActorSystem.apply();
         contextProbe = new TestProbe(system, "context");
-        final ActorContext actorContext = createActorContextMock(system, contextProbe.ref());
-        moduleShardBackendResolver = new ModuleShardBackendResolver(CLIENT_ID, actorContext);
-        when(actorContext.getShardStrategyFactory()).thenReturn(shardStrategyFactory);
-        when(shardStrategyFactory.getStrategy(YangInstanceIdentifier.EMPTY)).thenReturn(shardStrategy);
+
+        shardManagerProbe = new TestProbe(system, "ShardManager");
+
+        final ActorUtils actorUtils = createActorUtilsMock(system, contextProbe.ref());
+        doReturn(shardManagerProbe.ref()).when(actorUtils).getShardManager();
+
+        moduleShardBackendResolver = new ModuleShardBackendResolver(CLIENT_ID, actorUtils);
+        doReturn(shardStrategyFactory).when(actorUtils).getShardStrategyFactory();
+        doReturn(shardStrategy).when(shardStrategyFactory).getStrategy(YangInstanceIdentifier.empty());
         final PrimaryShardInfoFutureCache cache = new PrimaryShardInfoFutureCache();
-        when(actorContext.getPrimaryShardInfoCache()).thenReturn(cache);
+        doReturn(cache).when(actorUtils).getPrimaryShardInfoCache();
     }
 
     @After
@@ -79,16 +97,16 @@ public class ModuleShardBackendResolverTest {
 
     @Test
     public void testResolveShardForPathNonNullCookie() {
-        when(shardStrategy.findShard(YangInstanceIdentifier.EMPTY)).thenReturn("default");
-        final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.EMPTY);
-        Assert.assertEquals(0L, cookie.longValue());
+        doReturn(DefaultShardStrategy.DEFAULT_SHARD).when(shardStrategy).findShard(YangInstanceIdentifier.empty());
+        final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.empty());
+        assertEquals(0L, (long) cookie);
     }
 
     @Test
     public void testResolveShardForPathNullCookie() {
-        when(shardStrategy.findShard(YangInstanceIdentifier.EMPTY)).thenReturn("foo");
-        final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.EMPTY);
-        Assert.assertEquals(1L, cookie.longValue());
+        doReturn("foo").when(shardStrategy).findShard(YangInstanceIdentifier.empty());
+        final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.empty());
+        assertEquals(1L, (long) cookie);
     }
 
     @Test
@@ -97,13 +115,13 @@ public class ModuleShardBackendResolverTest {
         contextProbe.expectMsgClass(ConnectClientRequest.class);
         final TestProbe backendProbe = new TestProbe(system, "backend");
         final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, backendProbe.ref(),
-                Collections.emptyList(), dataTree, 3);
+                List.of(), dataTree, 3);
         contextProbe.reply(msg);
         final CompletionStage<ShardBackendInfo> stage = moduleShardBackendResolver.getBackendInfo(0L);
         final ShardBackendInfo shardBackendInfo = TestUtils.getWithTimeout(stage.toCompletableFuture());
-        Assert.assertEquals(0L, shardBackendInfo.getCookie().longValue());
-        Assert.assertEquals(dataTree, shardBackendInfo.getDataTree().get());
-        Assert.assertEquals("default", shardBackendInfo.getShardName());
+        assertEquals(0L, shardBackendInfo.getCookie().longValue());
+        assertEquals(dataTree, shardBackendInfo.getDataTree().get());
+        assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shardBackendInfo.getName());
     }
 
     @Test
@@ -117,7 +135,7 @@ public class ModuleShardBackendResolverTest {
         final ExecutionException caught =
                 TestUtils.assertOperationThrowsException(() -> TestUtils.getWithTimeout(stage.toCompletableFuture()),
                         ExecutionException.class);
-        Assert.assertEquals(cause, caught.getCause());
+        assertEquals(cause, caught.getCause());
     }
 
     @Test
@@ -127,7 +145,7 @@ public class ModuleShardBackendResolverTest {
         contextProbe.expectMsgClass(ConnectClientRequest.class);
         final TestProbe staleBackendProbe = new TestProbe(system, "staleBackend");
         final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, staleBackendProbe.ref(),
-                Collections.emptyList(), dataTree, 3);
+                List.of(), dataTree, 3);
         contextProbe.reply(msg);
         //get backend info
         final ShardBackendInfo staleBackendInfo = TestUtils.getWithTimeout(backendInfo.toCompletableFuture());
@@ -138,20 +156,42 @@ public class ModuleShardBackendResolverTest {
         contextProbe.expectMsgClass(ConnectClientRequest.class);
         final TestProbe refreshedBackendProbe = new TestProbe(system, "refreshedBackend");
         final ConnectClientSuccess msg2 = new ConnectClientSuccess(CLIENT_ID, 1L, refreshedBackendProbe.ref(),
-                Collections.emptyList(), dataTree, 3);
+                List.of(), dataTree, 3);
         contextProbe.reply(msg2);
         final ShardBackendInfo refreshedBackendInfo = TestUtils.getWithTimeout(refreshed.toCompletableFuture());
-        Assert.assertEquals(staleBackendInfo.getCookie(), refreshedBackendInfo.getCookie());
-        Assert.assertEquals(refreshedBackendProbe.ref(), refreshedBackendInfo.getActor());
+        assertEquals(staleBackendInfo.getCookie(), refreshedBackendInfo.getCookie());
+        assertEquals(refreshedBackendProbe.ref(), refreshedBackendInfo.getActor());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testNotifyWhenBackendInfoIsStale() {
+        final RegisterForShardAvailabilityChanges regMessage =
+                shardManagerProbe.expectMsgClass(RegisterForShardAvailabilityChanges.class);
+        Registration mockReg = mock(Registration.class);
+        shardManagerProbe.reply(new Status.Success(mockReg));
+
+        Consumer<Long> mockCallback = mock(Consumer.class);
+        final Registration callbackReg = moduleShardBackendResolver.notifyWhenBackendInfoIsStale(mockCallback);
+
+        regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
+        verify(mockCallback, timeout(5000)).accept((long) 0);
+
+        reset(mockCallback);
+        callbackReg.close();
+
+        regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        verifyNoMoreInteractions(mockCallback);
     }
 
-    private static ActorContext createActorContextMock(final ActorSystem system, final ActorRef actor) {
-        final ActorContext mock = mock(ActorContext.class);
+    private static ActorUtils createActorUtilsMock(final ActorSystem system, final ActorRef actor) {
+        final ActorUtils mock = mock(ActorUtils.class);
         final Promise<PrimaryShardInfo> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
         final ActorSelection selection = system.actorSelection(actor.path());
         final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
         promise.success(shardInfo);
-        when(mock.findPrimaryShardAsync("default")).thenReturn(promise.future());
+        doReturn(promise.future()).when(mock).findPrimaryShardAsync(DefaultShardStrategy.DEFAULT_SHARD);
         return mock;
     }
 }