import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;
import com.google.common.util.concurrent.Uninterruptibles;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.junit.After;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.junit.After;
import org.opendaylight.controller.cluster.access.commands.ConnectClientFailure;
import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
import org.opendaylight.controller.cluster.access.commands.ConnectClientFailure;
import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import scala.concurrent.Promise;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import scala.concurrent.Promise;
public class ModuleShardBackendResolverTest {
private static final MemberName MEMBER_NAME = MemberName.forName("member-1");
public class ModuleShardBackendResolverTest {
private static final MemberName MEMBER_NAME = MemberName.forName("member-1");
system = ActorSystem.apply();
contextProbe = new TestProbe(system, "context");
shardManagerProbe = new TestProbe(system, "ShardManager");
system = ActorSystem.apply();
contextProbe = new TestProbe(system, "context");
shardManagerProbe = new TestProbe(system, "ShardManager");
- final ActorContext actorContext = createActorContextMock(system, contextProbe.ref());
- when(actorContext.getShardManager()).thenReturn(shardManagerProbe.ref());
+ final ActorUtils actorUtils = createActorUtilsMock(system, contextProbe.ref());
+ doReturn(shardManagerProbe.ref()).when(actorUtils).getShardManager();
- moduleShardBackendResolver = new ModuleShardBackendResolver(CLIENT_ID, actorContext);
- when(actorContext.getShardStrategyFactory()).thenReturn(shardStrategyFactory);
- when(shardStrategyFactory.getStrategy(YangInstanceIdentifier.EMPTY)).thenReturn(shardStrategy);
+ moduleShardBackendResolver = new ModuleShardBackendResolver(CLIENT_ID, actorUtils);
+ doReturn(shardStrategyFactory).when(actorUtils).getShardStrategyFactory();
+ doReturn(shardStrategy).when(shardStrategyFactory).getStrategy(YangInstanceIdentifier.empty());
- when(shardStrategy.findShard(YangInstanceIdentifier.EMPTY)).thenReturn(DefaultShardStrategy.DEFAULT_SHARD);
- final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.EMPTY);
- Assert.assertEquals(0L, cookie.longValue());
+ doReturn(DefaultShardStrategy.DEFAULT_SHARD).when(shardStrategy).findShard(YangInstanceIdentifier.empty());
+ final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.empty());
+ assertEquals(0L, (long) cookie);
- when(shardStrategy.findShard(YangInstanceIdentifier.EMPTY)).thenReturn("foo");
- final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.EMPTY);
- Assert.assertEquals(1L, cookie.longValue());
+ doReturn("foo").when(shardStrategy).findShard(YangInstanceIdentifier.empty());
+ final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.empty());
+ assertEquals(1L, (long) cookie);
contextProbe.expectMsgClass(ConnectClientRequest.class);
final TestProbe backendProbe = new TestProbe(system, "backend");
final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, backendProbe.ref(),
contextProbe.expectMsgClass(ConnectClientRequest.class);
final TestProbe backendProbe = new TestProbe(system, "backend");
final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, backendProbe.ref(),
contextProbe.reply(msg);
final CompletionStage<ShardBackendInfo> stage = moduleShardBackendResolver.getBackendInfo(0L);
final ShardBackendInfo shardBackendInfo = TestUtils.getWithTimeout(stage.toCompletableFuture());
contextProbe.reply(msg);
final CompletionStage<ShardBackendInfo> stage = moduleShardBackendResolver.getBackendInfo(0L);
final ShardBackendInfo shardBackendInfo = TestUtils.getWithTimeout(stage.toCompletableFuture());
- Assert.assertEquals(0L, shardBackendInfo.getCookie().longValue());
- Assert.assertEquals(dataTree, shardBackendInfo.getDataTree().get());
- Assert.assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shardBackendInfo.getName());
+ assertEquals(0L, shardBackendInfo.getCookie().longValue());
+ assertEquals(dataTree, shardBackendInfo.getDataTree().get());
+ assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shardBackendInfo.getName());
final ExecutionException caught =
TestUtils.assertOperationThrowsException(() -> TestUtils.getWithTimeout(stage.toCompletableFuture()),
ExecutionException.class);
final ExecutionException caught =
TestUtils.assertOperationThrowsException(() -> TestUtils.getWithTimeout(stage.toCompletableFuture()),
ExecutionException.class);
contextProbe.expectMsgClass(ConnectClientRequest.class);
final TestProbe staleBackendProbe = new TestProbe(system, "staleBackend");
final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, staleBackendProbe.ref(),
contextProbe.expectMsgClass(ConnectClientRequest.class);
final TestProbe staleBackendProbe = new TestProbe(system, "staleBackend");
final ConnectClientSuccess msg = new ConnectClientSuccess(CLIENT_ID, 0L, staleBackendProbe.ref(),
contextProbe.expectMsgClass(ConnectClientRequest.class);
final TestProbe refreshedBackendProbe = new TestProbe(system, "refreshedBackend");
final ConnectClientSuccess msg2 = new ConnectClientSuccess(CLIENT_ID, 1L, refreshedBackendProbe.ref(),
contextProbe.expectMsgClass(ConnectClientRequest.class);
final TestProbe refreshedBackendProbe = new TestProbe(system, "refreshedBackend");
final ConnectClientSuccess msg2 = new ConnectClientSuccess(CLIENT_ID, 1L, refreshedBackendProbe.ref(),
contextProbe.reply(msg2);
final ShardBackendInfo refreshedBackendInfo = TestUtils.getWithTimeout(refreshed.toCompletableFuture());
contextProbe.reply(msg2);
final ShardBackendInfo refreshedBackendInfo = TestUtils.getWithTimeout(refreshed.toCompletableFuture());
- Assert.assertEquals(staleBackendInfo.getCookie(), refreshedBackendInfo.getCookie());
- Assert.assertEquals(refreshedBackendProbe.ref(), refreshedBackendInfo.getActor());
+ assertEquals(staleBackendInfo.getCookie(), refreshedBackendInfo.getCookie());
+ assertEquals(refreshedBackendProbe.ref(), refreshedBackendInfo.getActor());
final Registration callbackReg = moduleShardBackendResolver.notifyWhenBackendInfoIsStale(mockCallback);
regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
final Registration callbackReg = moduleShardBackendResolver.notifyWhenBackendInfoIsStale(mockCallback);
regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
- private static ActorContext createActorContextMock(final ActorSystem system, final ActorRef actor) {
- final ActorContext mock = mock(ActorContext.class);
+ private static ActorUtils createActorUtilsMock(final ActorSystem system, final ActorRef actor) {
+ final ActorUtils mock = mock(ActorUtils.class);
final Promise<PrimaryShardInfo> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
final ActorSelection selection = system.actorSelection(actor.path());
final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
promise.success(shardInfo);
final Promise<PrimaryShardInfo> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
final ActorSelection selection = system.actorSelection(actor.path());
final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
promise.success(shardInfo);