+
+ InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
+ InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
+ List<ShardManagerSnapshot> persistedSnapshots =
+ InMemorySnapshotStore.getSnapshots(shardManagerID, ShardManagerSnapshot.class);
+ assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
+ ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
+ assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
+ Sets.newHashSet(shardManagerSnapshot.getShardList()));
+ }};
+
+ LOG.info("testAddShardReplica ending");
+ }
+
+ @Test
+ public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
+ LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
+ new JavaTestKit(getSystem()) {{
+ TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
+ newPropsShardMgrWithMockShardActor(), shardMgrID);
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
+ AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
+ ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
+ Props.create(MockRespondActor.class, addServerReply), leaderId);
+
+ MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
+
+ String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.tell(new RoleChangeNotification(newReplicaId,
+ RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+ shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId, Optional.<DataTree>absent(),
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+
+ MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
+
+ Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
+
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ // Send message again to verify previous in progress state is cleared
+
+ shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+ resp = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
+
+ // Send message again with an AddServer timeout to verify the pre-existing shard actor isn't terminated.
+
+ shardManager.tell(newDatastoreContextFactory(datastoreContextBuilder.
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), getRef());
+ leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
+ shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+ expectMsgClass(duration("5 seconds"), Failure.class);
+
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+ }};
+
+ LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
+ }
+
+ @Test
+ public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
+ LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
+ new JavaTestKit(getSystem()) {{
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)),
+ DataStoreVersions.CURRENT_VERSION), getRef());
+ shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
+ RaftState.Leader.name())), mockShardActor);
+
+ shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+ Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
+
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);