Rename ActorContext to ActorUtils
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / databroker / actors / dds / ModuleShardBackendResolverTest.java
index 05145899b47ecae34adbe98e26ab81da7d541d15..f44cdf327d2d2f1a4088375545e02f00cf521e84 100644 (file)
@@ -8,16 +8,24 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
+import akka.actor.Status;
 import akka.testkit.TestProbe;
+import akka.testkit.javadsl.TestKit;
+import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.Collections;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -33,10 +41,13 @@ import org.opendaylight.controller.cluster.access.concepts.FrontendType;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.shardmanager.RegisterForShardAvailabilityChanges;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import scala.concurrent.Promise;
@@ -51,6 +62,7 @@ public class ModuleShardBackendResolverTest {
     private ActorSystem system;
     private ModuleShardBackendResolver moduleShardBackendResolver;
     private TestProbe contextProbe;
+    private TestProbe shardManagerProbe;
 
     @Mock
     private ShardStrategyFactory shardStrategyFactory;
@@ -60,32 +72,37 @@ public class ModuleShardBackendResolverTest {
     private DataTree dataTree;
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         MockitoAnnotations.initMocks(this);
         system = ActorSystem.apply();
         contextProbe = new TestProbe(system, "context");
-        final ActorContext actorContext = createActorContextMock(system, contextProbe.ref());
-        moduleShardBackendResolver = new ModuleShardBackendResolver(CLIENT_ID, actorContext);
-        when(actorContext.getShardStrategyFactory()).thenReturn(shardStrategyFactory);
+
+        shardManagerProbe = new TestProbe(system, "ShardManager");
+
+        final ActorUtils actorUtils = createActorUtilsMock(system, contextProbe.ref());
+        when(actorUtils.getShardManager()).thenReturn(shardManagerProbe.ref());
+
+        moduleShardBackendResolver = new ModuleShardBackendResolver(CLIENT_ID, actorUtils);
+        when(actorUtils.getShardStrategyFactory()).thenReturn(shardStrategyFactory);
         when(shardStrategyFactory.getStrategy(YangInstanceIdentifier.EMPTY)).thenReturn(shardStrategy);
         final PrimaryShardInfoFutureCache cache = new PrimaryShardInfoFutureCache();
-        when(actorContext.getPrimaryShardInfoCache()).thenReturn(cache);
+        when(actorUtils.getPrimaryShardInfoCache()).thenReturn(cache);
     }
 
     @After
-    public void tearDown() throws Exception {
-        JavaTestKit.shutdownActorSystem(system);
+    public void tearDown() {
+        TestKit.shutdownActorSystem(system);
     }
 
     @Test
-    public void testResolveShardForPathNonNullCookie() throws Exception {
-        when(shardStrategy.findShard(YangInstanceIdentifier.EMPTY)).thenReturn("default");
+    public void testResolveShardForPathNonNullCookie() {
+        when(shardStrategy.findShard(YangInstanceIdentifier.EMPTY)).thenReturn(DefaultShardStrategy.DEFAULT_SHARD);
         final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.EMPTY);
         Assert.assertEquals(0L, cookie.longValue());
     }
 
     @Test
-    public void testResolveShardForPathNullCookie() throws Exception {
+    public void testResolveShardForPathNullCookie() {
         when(shardStrategy.findShard(YangInstanceIdentifier.EMPTY)).thenReturn("foo");
         final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.EMPTY);
         Assert.assertEquals(1L, cookie.longValue());
@@ -103,23 +120,21 @@ public class ModuleShardBackendResolverTest {
         final ShardBackendInfo shardBackendInfo = TestUtils.getWithTimeout(stage.toCompletableFuture());
         Assert.assertEquals(0L, shardBackendInfo.getCookie().longValue());
         Assert.assertEquals(dataTree, shardBackendInfo.getDataTree().get());
-        Assert.assertEquals("default", shardBackendInfo.getShardName());
+        Assert.assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shardBackendInfo.getName());
     }
 
     @Test
     public void testGetBackendInfoFail() throws Exception {
         final CompletionStage<ShardBackendInfo> i = moduleShardBackendResolver.getBackendInfo(0L);
         final ConnectClientRequest req = contextProbe.expectMsgClass(ConnectClientRequest.class);
-        final RuntimeRequestException cause = new RuntimeRequestException("fail", new RuntimeException());
-        final ConnectClientFailure response =
-                req.toRequestFailure(cause);
+        final RuntimeException cause = new RuntimeException();
+        final ConnectClientFailure response = req.toRequestFailure(new RuntimeRequestException("fail", cause));
         contextProbe.reply(response);
         final CompletionStage<ShardBackendInfo> stage = moduleShardBackendResolver.getBackendInfo(0L);
         final ExecutionException caught =
                 TestUtils.assertOperationThrowsException(() -> TestUtils.getWithTimeout(stage.toCompletableFuture()),
                         ExecutionException.class);
-        Assert.assertNotNull(caught.getCause());
-        Assert.assertEquals(cause, caught.getCause().getCause());
+        Assert.assertEquals(cause, caught.getCause());
     }
 
     @Test
@@ -147,13 +162,36 @@ public class ModuleShardBackendResolverTest {
         Assert.assertEquals(refreshedBackendProbe.ref(), refreshedBackendInfo.getActor());
     }
 
-    private static ActorContext createActorContextMock(final ActorSystem system, final ActorRef actor) {
-        final ActorContext mock = mock(ActorContext.class);
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testNotifyWhenBackendInfoIsStale() {
+        final RegisterForShardAvailabilityChanges regMessage =
+                shardManagerProbe.expectMsgClass(RegisterForShardAvailabilityChanges.class);
+        Registration mockReg = mock(Registration.class);
+        shardManagerProbe.reply(new Status.Success(mockReg));
+
+        Consumer<Long> mockCallback = mock(Consumer.class);
+        final Registration callbackReg = moduleShardBackendResolver.notifyWhenBackendInfoIsStale(mockCallback);
+
+        regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
+        verify(mockCallback, timeout(5000)).accept(Long.valueOf(0));
+
+        reset(mockCallback);
+        callbackReg.close();
+
+        regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        verifyNoMoreInteractions(mockCallback);
+    }
+
+    private static ActorUtils createActorUtilsMock(final ActorSystem system, final ActorRef actor) {
+        final ActorUtils mock = mock(ActorUtils.class);
         final Promise<PrimaryShardInfo> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
         final ActorSelection selection = system.actorSelection(actor.path());
         final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
         promise.success(shardInfo);
-        when(mock.findPrimaryShardAsync("default")).thenReturn(promise.future());
+        when(mock.findPrimaryShardAsync(DefaultShardStrategy.DEFAULT_SHARD)).thenReturn(promise.future());
+        when(mock.getClientDispatcher()).thenReturn(system.dispatchers().defaultGlobalDispatcher());
         return mock;
     }
 }