private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
- private ActorSystem newActorSystem(String config) {
+ private ActorSystem newActorSystem(final String config) {
return newActorSystem("cluster-test", config);
}
- private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
+ private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
if (system == getSystem()) {
return actorFactory.createActor(Props.create(MessageCollectorActor.class), name);
return newShardMgrProps(new MockConfiguration());
}
- private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
+ private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
}
- private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) {
+ private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
.distributedDataStore(mock(DistributedDataStore.class));
}
Dispatchers.DefaultDispatcherId());
}
- private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) {
+ private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) {
return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
.withDispatcher(Dispatchers.DefaultDispatcherId());
}
return newTestShardManager(newShardMgrProps());
}
- private TestShardManager newTestShardManager(Props props) {
+ private TestShardManager newTestShardManager(final Props props) {
TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
TestShardManager shardManager = shardManagerActor.underlyingActor();
shardManager.waitForRecoveryComplete();
return shardManager;
}
- private static void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) {
+ private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
+ final JavaTestKit kit) {
AssertionError last = null;
Stopwatch sw = Stopwatch.createStarted();
while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
}
@SuppressWarnings("unchecked")
- private static <T> T expectMsgClassOrFailure(Class<T> msgClass, JavaTestKit kit, String msg) {
+ private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final JavaTestKit kit, final String msg) {
Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
if (reply instanceof Failure) {
throw new AssertionError(msg + " failed", ((Failure)reply).cause());
final MockConfiguration mockConfig = new MockConfiguration() {
@Override
- public Collection<String> getMemberShardNames(MemberName memberName) {
+ public Collection<String> getMemberShardNames(final MemberName memberName) {
return Arrays.asList("default", "topology");
}
@Override
- public Collection<MemberName> getMembersFromShardName(String shardName) {
+ public Collection<MemberName> getMembersFromShardName(final String shardName) {
return members("member-1");
}
};
final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
final CountDownLatch newShardActorLatch = new CountDownLatch(2);
class LocalShardManager extends ShardManager {
- LocalShardManager(AbstractShardManagerCreator<?> creator) {
+ LocalShardManager(final AbstractShardManagerCreator<?> creator) {
super(creator);
}
@Override
- protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+ protected ActorRef newShardActor(final ShardInformation info) {
Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
ActorRef ref = null;
if (entry != null) {
LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
@Override
- public List<String> getMemberShardNames(MemberName memberName) {
+ public List<String> getMemberShardNames(final MemberName memberName) {
return Arrays.asList("default", "astronauts");
}
}));
};
}
- private static List<MemberName> members(String... names) {
+ private static List<MemberName> members(final String... names) {
return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
}
private CountDownLatch memberReachableReceived = new CountDownLatch(1);
private volatile MessageInterceptor messageInterceptor;
- private TestShardManager(Builder builder) {
+ private TestShardManager(final Builder builder) {
super(builder);
shardActor = builder.shardActor;
shardActors = builder.shardActors;
}
@Override
- protected void handleRecover(Object message) throws Exception {
+ protected void handleRecover(final Object message) throws Exception {
try {
super.handleRecover(message);
} finally {
}
}
- private void countDownIfOther(final Member member, CountDownLatch latch) {
+ private void countDownIfOther(final Member member, final CountDownLatch latch) {
if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
latch.countDown();
}
}
@Override
- public void handleCommand(Object message) throws Exception {
+ public void handleCommand(final Object message) throws Exception {
try {
if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
getSender().tell(messageInterceptor.apply(message), getSelf());
}
}
- void setMessageInterceptor(MessageInterceptor messageInterceptor) {
+ void setMessageInterceptor(final MessageInterceptor messageInterceptor) {
this.messageInterceptor = messageInterceptor;
}
findPrimaryMessageReceived = new CountDownLatch(1);
}
- public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
+ public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) {
return new Builder(datastoreContextBuilder);
}
private ActorRef shardActor;
private final Map<String, ActorRef> shardActors = new HashMap<>();
- Builder(DatastoreContext.Builder datastoreContextBuilder) {
+ Builder(final DatastoreContext.Builder datastoreContextBuilder) {
super(TestShardManager.class);
datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
}
- Builder shardActor(ActorRef newShardActor) {
+ Builder shardActor(final ActorRef newShardActor) {
this.shardActor = newShardActor;
return this;
}
- Builder addShardActor(String shardName, ActorRef actorRef) {
+ Builder addShardActor(final String shardName, final ActorRef actorRef) {
shardActors.put(shardName, actorRef);
return this;
}
}
@Override
- public void saveSnapshot(Object obj) {
+ public void saveSnapshot(final Object obj) {
snapshot = (ShardManagerSnapshot) obj;
snapshotPersist.countDown();
super.saveSnapshot(obj);
}
- void verifySnapshotPersisted(Set<String> shardList) {
+ void verifySnapshotPersisted(final Set<String> shardList) {
assertEquals("saveSnapshot invoked", true,
Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
}
@Override
- protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+ protected ActorRef newShardActor(final ShardInformation info) {
if (shardActors.get(info.getShardName()) != null) {
return shardActors.get(info.getShardName());
}
return shardActor;
}
- return super.newShardActor(schemaContext, info);
+ return super.newShardActor(info);
}
}
extends AbstractShardManagerCreator<T> {
private final Class<C> shardManagerClass;
- AbstractGenericCreator(Class<C> shardManagerClass) {
+ AbstractGenericCreator(final Class<C> shardManagerClass) {
this.shardManagerClass = shardManagerClass;
cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
.primaryShardInfoCache(new PrimaryShardInfoFutureCache());
}
private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
- GenericCreator(Class<C> shardManagerClass) {
+ GenericCreator(final Class<C> shardManagerClass) {
super(shardManagerClass);
}
}
private static final long serialVersionUID = 1L;
private final Creator<ShardManager> delegate;
- DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
+ DelegatingShardManagerCreator(final Creator<ShardManager> delegate) {
this.delegate = delegate;
}
private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
return new MessageInterceptor() {
@Override
- public Object apply(Object message) {
+ public Object apply(final Object message) {
return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
}
@Override
- public boolean canIntercept(Object message) {
+ public boolean canIntercept(final Object message) {
return message instanceof FindPrimary;
}
};
private final Class<?> requestClass;
@SuppressWarnings("unused")
- MockRespondActor(Class<?> requestClass, Object responseMsg) {
+ MockRespondActor(final Class<?> requestClass, final Object responseMsg) {
this.requestClass = requestClass;
this.responseMsg = responseMsg;
}
@Override
- public void onReceive(Object message) throws Exception {
+ public void onReceive(final Object message) throws Exception {
if (message.equals(CLEAR_RESPONSE)) {
responseMsg = null;
} else {