- newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster(
- new ClusterWrapperImpl(system2)).props().
- withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
-
- new JavaTestKit(system1) {{
-
- newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
-
- leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
-
- String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
- short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
- leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
- Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
- leaderShardManager.tell(new RoleChangeNotification(memberId2,
- RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
-
- newReplicaShardManager.underlyingActor().waitForMemberUp();
- leaderShardManager.underlyingActor().waitForMemberUp();
-
- //Have a dummy snapshot to be overwritten by the new data persisted.
- String[] restoredShards = {"default", "people"};
- ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
- InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
- Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
-
- InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
- InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
-
- //construct a mock response message
- AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
- mockShardLeaderActor.underlyingActor().updateResponse(response);
- newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
- AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
- AddServer.class);
- String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
- assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
- expectMsgClass(duration("5 seconds"), Status.Success.class);
-
- InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
- InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
- List<ShardManagerSnapshot> persistedSnapshots =
- InMemorySnapshotStore.getSnapshots(shardManagerID, ShardManagerSnapshot.class);
- assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
- ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
- assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
- Sets.newHashSet(shardManagerSnapshot.getShardList()));
- }};
+ newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor)
+ .cluster(new ClusterWrapperImpl(system2)).props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()),
+ shardManagerID);
+
+ new JavaTestKit(system1) {
+ {
+ newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
+
+ short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
+ leaderShardManager.tell(
+ new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
+ mockShardLeaderActor);
+ leaderShardManager.tell(
+ new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
+ mockShardLeaderActor);
+
+ newReplicaShardManager.underlyingActor().waitForMemberUp();
+ leaderShardManager.underlyingActor().waitForMemberUp();
+
+ // Have a dummy snapshot to be overwritten by the new data
+ // persisted.
+ String[] restoredShards = { "default", "people" };
+ ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
+ InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
+
+ InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
+ InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
+
+ // construct a mock response message
+ newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
+ AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
+ AddServer.class);
+ String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
+ assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
+ expectMsgClass(duration("5 seconds"), Status.Success.class);
+
+ InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
+ InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
+ List<ShardManagerSnapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID,
+ ShardManagerSnapshot.class);
+ assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
+ ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
+ assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
+ Sets.newHashSet(shardManagerSnapshot.getShardList()));
+ }
+ };