import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Function;
-import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
private static TestActorRef<MessageCollectorActor> mockShardActor;
- private static String mockShardName;
+ private static ShardIdentifier mockShardName;
private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
InMemorySnapshotStore.clear();
if(mockShardActor == null) {
- mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
- mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
+ mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config");
+ mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class),
+ mockShardName.toString());
}
mockShardActor.underlyingActor().clear();
final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
final CountDownLatch newShardActorLatch = new CountDownLatch(2);
class LocalShardManager extends ShardManager {
- public LocalShardManager(AbstractBuilder<?> builder) {
- super(builder);
+ public LocalShardManager(AbstractShardManagerCreator<?> creator) {
+ super(creator);
}
@Override
private static final long serialVersionUID = 1L;
@Override
public ShardManager create() throws Exception {
- return new LocalShardManager(new GenericBuilder<LocalShardManager>(LocalShardManager.class).
+ return new LocalShardManager(new GenericCreator<LocalShardManager>(LocalShardManager.class).
datastoreContextFactory(mockFactory).primaryShardInfoCache(primaryShardInfoCache).
configuration(mockConfig));
}
shardManager.tell(new ActorInitialized(), mockShardActor);
DataTree mockDataTree = mock(DataTree.class);
- shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
DataStoreVersions.CURRENT_VERSION), getRef());
MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
shardManager.tell(new RoleChangeNotification(memberId1,
RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
- shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
- leaderVersion), mockShardActor);
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
DataTree mockDataTree = mock(DataTree.class);
- shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
DataStoreVersions.CURRENT_VERSION), mockShardActor);
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
DataTree mockDataTree = mock(DataTree.class);
- shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
DataStoreVersions.CURRENT_VERSION), mockShardActor);
LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
- Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
+ mock(DataTree.class), leaderVersion), mockShardActor2);
shardManager2.tell(new RoleChangeNotification(memberId2,
RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
- Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
+ mock(DataTree.class), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
shardManager1.tell(new RoleChangeNotification(memberId1,
RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
- shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
+ shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
DataStoreVersions.CURRENT_VERSION),
mockShardActor2);
shardManager2.tell(new RoleChangeNotification(memberId2,
String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
- Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
+ mock(DataTree.class), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
shardManager1.tell(new RoleChangeNotification(memberId1,
RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
- shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
+ shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
DataStoreVersions.CURRENT_VERSION),
mockShardActor2);
shardManager2.tell(new RoleChangeNotification(memberId2,
assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
- mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
+ mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
shardManager1.tell(MockClusterWrapper.
createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
- shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
+ shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
DataStoreVersions.CURRENT_VERSION), mockShardActor1);
shardManager1.tell(new RoleChangeNotification(memberId1,
RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
InMemoryJournal.addEntry(persistenceID, 2L, new SchemaContextModules(ImmutableSet.of("bar")));
InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
- TestShardManager shardManager = newTestShardManager();
+ newTestShardManager();
InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);
verify(ready, never()).countDown();
shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
- Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
+ mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
verify(ready, times(1)).countDown();
}
shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
- "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
+ "member-2-shard-default-" + shardMrgIDSuffix, mock(DataTree.class),
DataStoreVersions.CURRENT_VERSION));
verify(ready, times(1)).countDown();
verify(ready, never()).countDown();
shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
- "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
+ "member-2-shard-default-" + shardMrgIDSuffix, mock(DataTree.class),
DataStoreVersions.CURRENT_VERSION));
shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
- Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
+ mock(DataTree.class), leaderVersion), mockShardLeaderActor);
leaderShardManager.tell(new RoleChangeNotification(memberId2,
RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.tell(new RoleChangeNotification(newReplicaId,
RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
- shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId, Optional.<DataTree>absent(),
+ shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId,
DataStoreVersions.CURRENT_VERSION), mockShardActor);
shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)),
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
DataStoreVersions.CURRENT_VERSION), getRef());
shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
RaftState.Leader.name())), mockShardActor);
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), respondActor);
- shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)),
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
DataStoreVersions.CURRENT_VERSION), getRef());
shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
RaftState.Leader.name())), respondActor);
String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
- Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
+ mock(DataTree.class), leaderVersion), mockShardLeaderActor);
leaderShardManager.tell(new RoleChangeNotification(memberId2,
RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
newReplicaShardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2,
- Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor);
+ mock(DataTree.class), leaderVersion), mockShardActor);
newReplicaShardManager.tell(new RoleChangeNotification(memberId1,
RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
return new Builder(datastoreContextBuilder);
}
- private static class Builder extends AbstractGenericBuilder<Builder, TestShardManager> {
+ private static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
private ActorRef shardActor;
private final Map<String, ActorRef> shardActors = new HashMap<>();
}
}
- private static abstract class AbstractGenericBuilder<T extends AbstractGenericBuilder<T, ?>, C extends ShardManager>
- extends ShardManager.AbstractBuilder<T> {
+ private static abstract class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
+ extends AbstractShardManagerCreator<T> {
private final Class<C> shardManagerClass;
- AbstractGenericBuilder(Class<C> shardManagerClass) {
+ AbstractGenericCreator(Class<C> shardManagerClass) {
this.shardManagerClass = shardManagerClass;
cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).
waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache());
}
}
- private static class GenericBuilder<C extends ShardManager> extends AbstractGenericBuilder<GenericBuilder<C>, C> {
- GenericBuilder(Class<C> shardManagerClass) {
+ private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
+ GenericCreator(Class<C> shardManagerClass) {
super(shardManagerClass);
}
}