private static String mockShardName;
private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
- dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
+ dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
private final Collection<ActorSystem> actorSystems = new ArrayList<>();
@Test
public void testAddShardReplica() throws Exception {
- LOG.info("testAddShardReplica starting");
-
MockConfiguration mockConfig =
new MockConfiguration(ImmutableMap.<String, List<String>>builder().
put("default", Arrays.asList("member-1", "member-2")).
put("astronauts", Arrays.asList("member-2")).build());
- String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+ final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+ datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
// Create an ActorSystem ShardManager actor for member-1.
final ActorSystem system1 = newActorSystem("Member1");
newReplicaShardManager.underlyingActor().waitForMemberUp();
leaderShardManager.underlyingActor().waitForMemberUp();
+ //Have a dummy snapshot to be overwritten by the new data persisted.
+ String[] restoredShards = {"default", "people"};
+ ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
+ InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
+
+ InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
+ InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
+
//construct a mock response message
AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
mockShardLeaderActor.underlyingActor().updateResponse(response);
AddServer.class);
String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
- newReplicaShardManager.underlyingActor()
- .verifySnapshotPersisted(Sets.newHashSet("default", "astronauts"));
expectMsgClass(duration("5 seconds"), Status.Success.class);
+
+ InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
+ InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
+ List<ShardManagerSnapshot> persistedSnapshots =
+ InMemorySnapshotStore.getSnapshots(shardManagerID, ShardManagerSnapshot.class);
+ assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
+ ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
+ assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
+ Sets.newHashSet(shardManagerSnapshot.getShardList()));
}};
LOG.info("testAddShardReplica ending");
public void saveSnapshot(Object obj) {
snapshot = (ShardManagerSnapshot) obj;
snapshotPersist.countDown();
+ super.saveSnapshot(obj);
}
void verifySnapshotPersisted(Set<String> shardList) {