X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FModuleShardBackendResolverTest.java;h=ebd96c974187682807d122ff5d2c5484c181fbab;hp=bedd4a9283ec4d4710559e7f971418021f2964b7;hb=514df5b3d350482fee8ee8f1f5b257c229f7e61a;hpb=12fcdfe39aa26dcba7fd3bb4d4c68e3d02e65c51 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolverTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolverTest.java index bedd4a9283..ebd96c9741 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolverTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolverTest.java @@ -7,23 +7,32 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; +import akka.actor.Status; import akka.testkit.TestProbe; import akka.testkit.javadsl.TestKit; -import java.util.Collections; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.Mock; -import org.mockito.MockitoAnnotations; +import org.mockito.junit.MockitoJUnitRunner; import org.opendaylight.controller.cluster.access.commands.ConnectClientFailure; import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; @@ -33,14 +42,18 @@ import org.opendaylight.controller.cluster.access.concepts.FrontendType; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.datastore.shardmanager.RegisterForShardAvailabilityChanges; +import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; +import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import scala.concurrent.Promise; +@RunWith(MockitoJUnitRunner.StrictStubs.class) public class ModuleShardBackendResolverTest { private static final MemberName MEMBER_NAME = MemberName.forName("member-1"); @@ -51,6 +64,7 @@ public class ModuleShardBackendResolverTest { private ActorSystem system; private ModuleShardBackendResolver moduleShardBackendResolver; private TestProbe contextProbe; + private TestProbe shardManagerProbe; @Mock private ShardStrategyFactory shardStrategyFactory; @@ -61,15 +75,19 @@ public class ModuleShardBackendResolverTest { @Before public void setUp() { - MockitoAnnotations.initMocks(this); system = ActorSystem.apply(); contextProbe = new TestProbe(system, "context"); - final ActorContext actorContext = createActorContextMock(system, contextProbe.ref()); - moduleShardBackendResolver = new ModuleShardBackendResolver(CLIENT_ID, actorContext); - when(actorContext.getShardStrategyFactory()).thenReturn(shardStrategyFactory); - when(shardStrategyFactory.getStrategy(YangInstanceIdentifier.EMPTY)).thenReturn(shardStrategy); + + shardManagerProbe = new TestProbe(system, "ShardManager"); + + final ActorUtils actorUtils = createActorUtilsMock(system, contextProbe.ref()); + doReturn(shardManagerProbe.ref()).when(actorUtils).getShardManager(); + + moduleShardBackendResolver = new ModuleShardBackendResolver(CLIENT_ID, actorUtils); + doReturn(shardStrategyFactory).when(actorUtils).getShardStrategyFactory(); + doReturn(shardStrategy).when(shardStrategyFactory).getStrategy(YangInstanceIdentifier.empty()); final PrimaryShardInfoFutureCache cache = new PrimaryShardInfoFutureCache(); - when(actorContext.getPrimaryShardInfoCache()).thenReturn(cache); + doReturn(cache).when(actorUtils).getPrimaryShardInfoCache(); } @After @@ -79,16 +97,16 @@ public class ModuleShardBackendResolverTest { @Test public void testResolveShardForPathNonNullCookie() { - when(shardStrategy.findShard(YangInstanceIdentifier.EMPTY)).thenReturn("default"); - final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.EMPTY); - Assert.assertEquals(0L, cookie.longValue()); + doReturn(DefaultShardStrategy.DEFAULT_SHARD).when(shardStrategy).findShard(YangInstanceIdentifier.empty()); + final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.empty()); + assertEquals(0L, (long) cookie); } @Test public void testResolveShardForPathNullCookie() { - when(shardStrategy.findShard(YangInstanceIdentifier.EMPTY)).thenReturn("foo"); - final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.EMPTY); - Assert.assertEquals(1L, cookie.longValue()); + doReturn("foo").when(shardStrategy).findShard(YangInstanceIdentifier.empty()); + final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.empty()); + assertEquals(1L, (long) cookie); } @Test @@ -97,13 +115,13 @@ public class ModuleShardBackendResolverTest { contextProbe.expectMsgClass(ConnectClientRequest.class); final TestProbe backendProbe = new TestProbe(system, "backend"); final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, backendProbe.ref(), - Collections.emptyList(), dataTree, 3); + List.of(), dataTree, 3); contextProbe.reply(msg); final CompletionStage stage = moduleShardBackendResolver.getBackendInfo(0L); final ShardBackendInfo shardBackendInfo = TestUtils.getWithTimeout(stage.toCompletableFuture()); - Assert.assertEquals(0L, shardBackendInfo.getCookie().longValue()); - Assert.assertEquals(dataTree, shardBackendInfo.getDataTree().get()); - Assert.assertEquals("default", shardBackendInfo.getShardName()); + assertEquals(0L, shardBackendInfo.getCookie().longValue()); + assertEquals(dataTree, shardBackendInfo.getDataTree().get()); + assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shardBackendInfo.getName()); } @Test @@ -117,7 +135,7 @@ public class ModuleShardBackendResolverTest { final ExecutionException caught = TestUtils.assertOperationThrowsException(() -> TestUtils.getWithTimeout(stage.toCompletableFuture()), ExecutionException.class); - Assert.assertEquals(cause, caught.getCause()); + assertEquals(cause, caught.getCause()); } @Test @@ -127,7 +145,7 @@ public class ModuleShardBackendResolverTest { contextProbe.expectMsgClass(ConnectClientRequest.class); final TestProbe staleBackendProbe = new TestProbe(system, "staleBackend"); final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, staleBackendProbe.ref(), - Collections.emptyList(), dataTree, 3); + List.of(), dataTree, 3); contextProbe.reply(msg); //get backend info final ShardBackendInfo staleBackendInfo = TestUtils.getWithTimeout(backendInfo.toCompletableFuture()); @@ -138,20 +156,42 @@ public class ModuleShardBackendResolverTest { contextProbe.expectMsgClass(ConnectClientRequest.class); final TestProbe refreshedBackendProbe = new TestProbe(system, "refreshedBackend"); final ConnectClientSuccess msg2 = new ConnectClientSuccess(CLIENT_ID, 1L, refreshedBackendProbe.ref(), - Collections.emptyList(), dataTree, 3); + List.of(), dataTree, 3); contextProbe.reply(msg2); final ShardBackendInfo refreshedBackendInfo = TestUtils.getWithTimeout(refreshed.toCompletableFuture()); - Assert.assertEquals(staleBackendInfo.getCookie(), refreshedBackendInfo.getCookie()); - Assert.assertEquals(refreshedBackendProbe.ref(), refreshedBackendInfo.getActor()); + assertEquals(staleBackendInfo.getCookie(), refreshedBackendInfo.getCookie()); + assertEquals(refreshedBackendProbe.ref(), refreshedBackendInfo.getActor()); + } + + @SuppressWarnings("unchecked") + @Test + public void testNotifyWhenBackendInfoIsStale() { + final RegisterForShardAvailabilityChanges regMessage = + shardManagerProbe.expectMsgClass(RegisterForShardAvailabilityChanges.class); + Registration mockReg = mock(Registration.class); + shardManagerProbe.reply(new Status.Success(mockReg)); + + Consumer mockCallback = mock(Consumer.class); + final Registration callbackReg = moduleShardBackendResolver.notifyWhenBackendInfoIsStale(mockCallback); + + regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD); + verify(mockCallback, timeout(5000)).accept((long) 0); + + reset(mockCallback); + callbackReg.close(); + + regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD); + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + verifyNoMoreInteractions(mockCallback); } - private static ActorContext createActorContextMock(final ActorSystem system, final ActorRef actor) { - final ActorContext mock = mock(ActorContext.class); + private static ActorUtils createActorUtilsMock(final ActorSystem system, final ActorRef actor) { + final ActorUtils mock = mock(ActorUtils.class); final Promise promise = new scala.concurrent.impl.Promise.DefaultPromise<>(); final ActorSelection selection = system.actorSelection(actor.path()); final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0); promise.success(shardInfo); - when(mock.findPrimaryShardAsync("default")).thenReturn(promise.future()); + doReturn(promise.future()).when(mock).findPrimaryShardAsync(DefaultShardStrategy.DEFAULT_SHARD); return mock; } }