*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
+import akka.actor.Status;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;
-import java.util.Collections;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
+import org.mockito.junit.MockitoJUnitRunner;
import org.opendaylight.controller.cluster.access.commands.ConnectClientFailure;
import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.shardmanager.RegisterForShardAvailabilityChanges;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import scala.concurrent.Promise;
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class ModuleShardBackendResolverTest {
private static final MemberName MEMBER_NAME = MemberName.forName("member-1");
private ActorSystem system;
private ModuleShardBackendResolver moduleShardBackendResolver;
private TestProbe contextProbe;
+ private TestProbe shardManagerProbe;
@Mock
private ShardStrategyFactory shardStrategyFactory;
@Before
public void setUp() {
- MockitoAnnotations.initMocks(this);
system = ActorSystem.apply();
contextProbe = new TestProbe(system, "context");
- final ActorContext actorContext = createActorContextMock(system, contextProbe.ref());
- moduleShardBackendResolver = new ModuleShardBackendResolver(CLIENT_ID, actorContext);
- when(actorContext.getShardStrategyFactory()).thenReturn(shardStrategyFactory);
- when(shardStrategyFactory.getStrategy(YangInstanceIdentifier.EMPTY)).thenReturn(shardStrategy);
+
+ shardManagerProbe = new TestProbe(system, "ShardManager");
+
+ final ActorUtils actorUtils = createActorUtilsMock(system, contextProbe.ref());
+ doReturn(shardManagerProbe.ref()).when(actorUtils).getShardManager();
+
+ moduleShardBackendResolver = new ModuleShardBackendResolver(CLIENT_ID, actorUtils);
+ doReturn(shardStrategyFactory).when(actorUtils).getShardStrategyFactory();
+ doReturn(shardStrategy).when(shardStrategyFactory).getStrategy(YangInstanceIdentifier.empty());
final PrimaryShardInfoFutureCache cache = new PrimaryShardInfoFutureCache();
- when(actorContext.getPrimaryShardInfoCache()).thenReturn(cache);
+ doReturn(cache).when(actorUtils).getPrimaryShardInfoCache();
}
@After
@Test
public void testResolveShardForPathNonNullCookie() {
- when(shardStrategy.findShard(YangInstanceIdentifier.EMPTY)).thenReturn("default");
- final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.EMPTY);
- Assert.assertEquals(0L, cookie.longValue());
+ doReturn(DefaultShardStrategy.DEFAULT_SHARD).when(shardStrategy).findShard(YangInstanceIdentifier.empty());
+ final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.empty());
+ assertEquals(0L, (long) cookie);
}
@Test
public void testResolveShardForPathNullCookie() {
- when(shardStrategy.findShard(YangInstanceIdentifier.EMPTY)).thenReturn("foo");
- final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.EMPTY);
- Assert.assertEquals(1L, cookie.longValue());
+ doReturn("foo").when(shardStrategy).findShard(YangInstanceIdentifier.empty());
+ final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.empty());
+ assertEquals(1L, (long) cookie);
}
@Test
contextProbe.expectMsgClass(ConnectClientRequest.class);
final TestProbe backendProbe = new TestProbe(system, "backend");
final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, backendProbe.ref(),
- Collections.emptyList(), dataTree, 3);
+ List.of(), dataTree, 3);
contextProbe.reply(msg);
final CompletionStage<ShardBackendInfo> stage = moduleShardBackendResolver.getBackendInfo(0L);
final ShardBackendInfo shardBackendInfo = TestUtils.getWithTimeout(stage.toCompletableFuture());
- Assert.assertEquals(0L, shardBackendInfo.getCookie().longValue());
- Assert.assertEquals(dataTree, shardBackendInfo.getDataTree().get());
- Assert.assertEquals("default", shardBackendInfo.getShardName());
+ assertEquals(0L, shardBackendInfo.getCookie().longValue());
+ assertEquals(dataTree, shardBackendInfo.getDataTree().get());
+ assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shardBackendInfo.getName());
}
@Test
final ExecutionException caught =
TestUtils.assertOperationThrowsException(() -> TestUtils.getWithTimeout(stage.toCompletableFuture()),
ExecutionException.class);
- Assert.assertEquals(cause, caught.getCause());
+ assertEquals(cause, caught.getCause());
}
@Test
contextProbe.expectMsgClass(ConnectClientRequest.class);
final TestProbe staleBackendProbe = new TestProbe(system, "staleBackend");
final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, staleBackendProbe.ref(),
- Collections.emptyList(), dataTree, 3);
+ List.of(), dataTree, 3);
contextProbe.reply(msg);
//get backend info
final ShardBackendInfo staleBackendInfo = TestUtils.getWithTimeout(backendInfo.toCompletableFuture());
contextProbe.expectMsgClass(ConnectClientRequest.class);
final TestProbe refreshedBackendProbe = new TestProbe(system, "refreshedBackend");
final ConnectClientSuccess msg2 = new ConnectClientSuccess(CLIENT_ID, 1L, refreshedBackendProbe.ref(),
- Collections.emptyList(), dataTree, 3);
+ List.of(), dataTree, 3);
contextProbe.reply(msg2);
final ShardBackendInfo refreshedBackendInfo = TestUtils.getWithTimeout(refreshed.toCompletableFuture());
- Assert.assertEquals(staleBackendInfo.getCookie(), refreshedBackendInfo.getCookie());
- Assert.assertEquals(refreshedBackendProbe.ref(), refreshedBackendInfo.getActor());
+ assertEquals(staleBackendInfo.getCookie(), refreshedBackendInfo.getCookie());
+ assertEquals(refreshedBackendProbe.ref(), refreshedBackendInfo.getActor());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testNotifyWhenBackendInfoIsStale() {
+ final RegisterForShardAvailabilityChanges regMessage =
+ shardManagerProbe.expectMsgClass(RegisterForShardAvailabilityChanges.class);
+ Registration mockReg = mock(Registration.class);
+ shardManagerProbe.reply(new Status.Success(mockReg));
+
+ Consumer<Long> mockCallback = mock(Consumer.class);
+ final Registration callbackReg = moduleShardBackendResolver.notifyWhenBackendInfoIsStale(mockCallback);
+
+ regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
+ verify(mockCallback, timeout(5000)).accept((long) 0);
+
+ reset(mockCallback);
+ callbackReg.close();
+
+ regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyNoMoreInteractions(mockCallback);
}
- private static ActorContext createActorContextMock(final ActorSystem system, final ActorRef actor) {
- final ActorContext mock = mock(ActorContext.class);
+ private static ActorUtils createActorUtilsMock(final ActorSystem system, final ActorRef actor) {
+ final ActorUtils mock = mock(ActorUtils.class);
final Promise<PrimaryShardInfo> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
final ActorSelection selection = system.actorSelection(actor.path());
final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
promise.success(shardInfo);
- when(mock.findPrimaryShardAsync("default")).thenReturn(promise.future());
+ doReturn(promise.future()).when(mock).findPrimaryShardAsync(DefaultShardStrategy.DEFAULT_SHARD);
return mock;
}
}