- private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
+ private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
if (system == getSystem()) {
return actorFactory.createActor(Props.create(MessageCollectorActor.class), name);
String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
if (system == getSystem()) {
return actorFactory.createActor(Props.create(MessageCollectorActor.class), name);
DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
.distributedDataStore(mock(DistributedDataStore.class));
}
return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
.distributedDataStore(mock(DistributedDataStore.class));
}
return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
.withDispatcher(Dispatchers.DefaultDispatcherId());
}
return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
.withDispatcher(Dispatchers.DefaultDispatcherId());
}
TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
TestShardManager shardManager = shardManagerActor.underlyingActor();
shardManager.waitForRecoveryComplete();
return shardManager;
}
TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
TestShardManager shardManager = shardManagerActor.underlyingActor();
shardManager.waitForRecoveryComplete();
return shardManager;
}
- private static void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) {
+ private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
+ final JavaTestKit kit) {
AssertionError last = null;
Stopwatch sw = Stopwatch.createStarted();
while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
AssertionError last = null;
Stopwatch sw = Stopwatch.createStarted();
while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
- private static <T> T expectMsgClassOrFailure(Class<T> msgClass, JavaTestKit kit, String msg) {
+ private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final JavaTestKit kit, final String msg) {
Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
if (reply instanceof Failure) {
throw new AssertionError(msg + " failed", ((Failure)reply).cause());
Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
if (reply instanceof Failure) {
throw new AssertionError(msg + " failed", ((Failure)reply).cause());
final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
final CountDownLatch newShardActorLatch = new CountDownLatch(2);
class LocalShardManager extends ShardManager {
final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
final CountDownLatch newShardActorLatch = new CountDownLatch(2);
class LocalShardManager extends ShardManager {
LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
@Override
LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
@Override
private CountDownLatch memberReachableReceived = new CountDownLatch(1);
private volatile MessageInterceptor messageInterceptor;
private CountDownLatch memberReachableReceived = new CountDownLatch(1);
private volatile MessageInterceptor messageInterceptor;
try {
if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
getSender().tell(messageInterceptor.apply(message), getSelf());
try {
if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
getSender().tell(messageInterceptor.apply(message), getSelf());
private ActorRef shardActor;
private final Map<String, ActorRef> shardActors = new HashMap<>();
private ActorRef shardActor;
private final Map<String, ActorRef> shardActors = new HashMap<>();
super(TestShardManager.class);
datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
}
super(TestShardManager.class);
datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
}
assertEquals("saveSnapshot invoked", true,
Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
}
@Override
assertEquals("saveSnapshot invoked", true,
Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
}
@Override
this.shardManagerClass = shardManagerClass;
cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
.primaryShardInfoCache(new PrimaryShardInfoFutureCache());
this.shardManagerClass = shardManagerClass;
cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
.primaryShardInfoCache(new PrimaryShardInfoFutureCache());
private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
return new MessageInterceptor() {
@Override
private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
return new MessageInterceptor() {
@Override