+ LOG.info("testAddShardReplica ending");
+ }
+
+ @Test
+ public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
+ LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
+ new JavaTestKit(getSystem()) {{
+ TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
+ newPropsShardMgrWithMockShardActor(), shardMgrID);
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
+ AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
+ ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
+ Props.create(MockRespondActor.class, addServerReply), leaderId);
+
+ MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
+
+ String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.tell(new RoleChangeNotification(newReplicaId,
+ RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+ shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId, Optional.<DataTree>absent(),
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+
+ MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
+
+ Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
+
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ // Send message again to verify previous in progress state is cleared
+
+ shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+ resp = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
+
+ // Send message again with an AddServer timeout to verify the pre-existing shard actor isn't terminated.
+
+ shardManager.tell(newDatastoreContextFactory(datastoreContextBuilder.
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), getRef());
+ leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
+ shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+ expectMsgClass(duration("5 seconds"), Failure.class);
+
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+ }};
+
+ LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
+ }
+
+ @Test
+ public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
+ LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
+ new JavaTestKit(getSystem()) {{
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)),
+ DataStoreVersions.CURRENT_VERSION), getRef());
+ shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
+ RaftState.Leader.name())), mockShardActor);
+
+ shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+ Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
+
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+ }};
+
+ LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
+ }
+
+ @Test
+ public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
+ LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
+ new JavaTestKit(getSystem()) {{
+ JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
+
+ MockConfiguration mockConfig =
+ new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("astronauts", Arrays.asList("member-2")).build());
+
+ ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
+ final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
+ newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props(), shardMgrID);
+ shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
+ terminateWatcher.watch(mockNewReplicaShardActor);
+
+ shardManager.tell(new AddShardReplica("astronauts"), getRef());
+
+ AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
+ assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
+ addServerMsg.getNewServerId());
+ mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
+
+ Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
+
+ shardManager.tell(new FindLocalShard("astronauts", false), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+ terminateWatcher.expectTerminated(mockNewReplicaShardActor);
+
+ shardManager.tell(new AddShardReplica("astronauts"), getRef());
+ mockShardLeaderKit.expectMsgClass(AddServer.class);
+ mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
+ failure = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
+ }};
+
+ LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
+ }
+
+ @Test
+ public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
+ testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
+ AddServer.class, new AddShardReplica("astronauts"));