+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+ }};
+ }
+
+ @Test
+ public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
+
+ MockConfiguration mockConfig =
+ new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("astronauts", Arrays.asList("member-2")).build());
+
+ ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
+ final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
+ newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props(), shardMgrID);
+ shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
+ terminateWatcher.watch(mockNewReplicaShardActor);
+
+ shardManager.tell(new AddShardReplica("astronauts"), getRef());
+
+ AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
+ assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
+ addServerMsg.getNewServerId());
+ mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
+
+ Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
+
+ shardManager.tell(new FindLocalShard("astronauts", false), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+ terminateWatcher.expectTerminated(mockNewReplicaShardActor);
+
+ shardManager.tell(new AddShardReplica("astronauts"), getRef());
+ mockShardLeaderKit.expectMsgClass(AddServer.class);
+ mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
+ failure = expectMsgClass(duration("5 seconds"), Failure.class);
+ assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
+ }};
+ }
+
+ @Test
+ public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
+ JavaTestKit secondRequestKit = new JavaTestKit(getSystem());
+
+ MockConfiguration mockConfig =
+ new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("astronauts", Arrays.asList("member-2")).build());
+
+ final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
+ newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props(), shardMgrID);
+ shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ shardManager.tell(new AddShardReplica("astronauts"), getRef());
+
+ mockShardLeaderKit.expectMsgClass(AddServer.class);
+
+ shardManager.tell(new AddShardReplica("astronauts"), secondRequestKit.getRef());
+
+ secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
+ }};
+ }
+
+ @Test
+ public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("astronauts", Arrays.asList("member-2")).build());
+
+ final ActorRef newReplicaShardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig).
+ shardActor(mockShardActor).props(), shardMgrID);