+ @Test
+ public void testChangeServersVotingStatus() {
+ final TestKit kit = new TestKit(getSystem());
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+ ActorRef respondActor = actorFactory
+ .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
+ new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
+
+ ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
+
+ shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
+ shardManager.tell(new ActorInitialized(), respondActor);
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
+ DataStoreVersions.CURRENT_VERSION), kit.getRef());
+ shardManager.tell(
+ new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
+ respondActor);
+
+ shardManager.tell(
+ new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
+
+ ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor
+ .expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
+ assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
+ ImmutableMap.of(ShardIdentifier
+ .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
+ Boolean.TRUE));
+
+ kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
+ }
+
+ @Test
+ public void testChangeServersVotingStatusWithNoLeader() {
+ final TestKit kit = new TestKit(getSystem());
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+ ActorRef respondActor = actorFactory
+ .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
+ new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
+
+ ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
+
+ shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
+ shardManager.tell(new ActorInitialized(), respondActor);
+ shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
+
+ shardManager.tell(
+ new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
+
+ MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
+
+ Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
+ assertTrue("Failure resposnse", resp.cause() instanceof NoShardLeaderException);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testRegisterForShardLeaderChanges() {
+ LOG.info("testRegisterForShardLeaderChanges starting");
+
+ final String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+ final String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+ final TestKit kit = new TestKit(getSystem());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ final Consumer<String> mockCallback = mock(Consumer.class);
+ shardManager.tell(new RegisterForShardAvailabilityChanges(mockCallback), kit.getRef());
+
+ final Success reply = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
+ final Registration reg = (Registration) reply.status();
+
+ final DataTree mockDataTree = mock(DataTree.class);
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ verify(mockCallback, timeout(5000)).accept("default");
+
+ reset(mockCallback);
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyNoMoreInteractions(mockCallback);
+
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, null, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ verify(mockCallback, timeout(5000)).accept("default");
+
+ reset(mockCallback);
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ verify(mockCallback, timeout(5000)).accept("default");
+
+ reset(mockCallback);
+ reg.close();
+
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyNoMoreInteractions(mockCallback);
+
+ LOG.info("testRegisterForShardLeaderChanges ending");
+ }
+
+ public static class TestShardManager extends ShardManager {