+ public void testFlipMemberVotingStates() throws Exception {
+ String name = "testFlipMemberVotingStates";
+
+ ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo("member-1", true), new ServerInfo("member-2", true),
+ new ServerInfo("member-3", false)));
+
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
+
+ String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+ MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+ MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ leaderNode1.configDataStore().waitTillReady();
+ leaderNode1.operDataStore().waitTillReady();
+ verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true),
+ new SimpleEntry<>("member-2", true), new SimpleEntry<>("member-3", false));
+
+ ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+ replicaNode3.operDataStore());
+
+ RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards().
+ get(10, TimeUnit.SECONDS);
+ FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+ verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+ successShardResult("people", DataStoreType.Config),
+ successShardResult("cars", DataStoreType.Operational),
+ successShardResult("people", DataStoreType.Operational));
+
+ verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+ replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+ replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+ new String[]{"cars", "people"},
+ new SimpleEntry<>("member-1", false), new SimpleEntry<>("member-2", false),
+ new SimpleEntry<>("member-3", true));
+
+ // Leadership should have transferred to member 3 since it is the only remaining voting member.
+ verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
+ assertNotNull("Expected non-null leader Id", raftState.getLeader());
+ assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+ raftState.getLeader().contains("member-3"));
+ });
+
+ verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
+ assertNotNull("Expected non-null leader Id", raftState.getLeader());
+ assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+ raftState.getLeader().contains("member-3"));
+ });
+
+ // Flip the voting states back to the original states.
+
+ rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS);
+ result = verifySuccessfulRpcResult(rpcResult);
+ verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+ successShardResult("people", DataStoreType.Config),
+ successShardResult("cars", DataStoreType.Operational),
+ successShardResult("people", DataStoreType.Operational));
+
+ verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+ replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+ replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+ new String[]{"cars", "people"},
+ new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", true),
+ new SimpleEntry<>("member-3", false));
+
+ // Leadership should have transferred to member 1 or 2.
+ verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
+ assertNotNull("Expected non-null leader Id", raftState.getLeader());
+ assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
+ raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
+ });
+ }
+
+ @Test
+ public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
+ String name = "testFlipMemberVotingStatesWithNoInitialLeader";
+
+ // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
+ // non-voting and simulated as down by not starting them up.
+ ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo("member-1", false), new ServerInfo("member-2", false),
+ new ServerInfo("member-3", false), new ServerInfo("member-4", true),
+ new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
+
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
+
+ String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+ MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+ MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ // Initially there won't be a leader b/c all the up nodes are non-voting.
+
+ replicaNode1.waitForMembersUp("member-2", "member-3");
+
+ verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", false),
+ new SimpleEntry<>("member-2", false), new SimpleEntry<>("member-3", false),
+ new SimpleEntry<>("member-4", true), new SimpleEntry<>("member-5", true),
+ new SimpleEntry<>("member-6", true));
+
+ verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
+ assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
+
+ ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
+ replicaNode1.operDataStore());
+
+ RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards().
+ get(10, TimeUnit.SECONDS);
+ FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+ verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+ successShardResult("people", DataStoreType.Config),
+ successShardResult("cars", DataStoreType.Operational),
+ successShardResult("people", DataStoreType.Operational));
+
+ verifyVotingStates(new DistributedDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
+ replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+ replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+ new String[]{"cars", "people"},
+ new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", true),
+ new SimpleEntry<>("member-3", true), new SimpleEntry<>("member-4", false),
+ new SimpleEntry<>("member-5", false), new SimpleEntry<>("member-6", false));
+
+ // Since member 1 was changed to voting and there was no leader, it should've started and election
+ // and become leader
+ verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
+ assertNotNull("Expected non-null leader Id", raftState.getLeader());
+ assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+ raftState.getLeader().contains("member-1"));
+ });
+
+ verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
+ assertNotNull("Expected non-null leader Id", raftState.getLeader());
+ assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+ raftState.getLeader().contains("member-1"));
+ });
+ }
+
+ @Test
+ public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
+ String name = "testFlipMemberVotingStatesWithVotingMembersDown";
+
+ // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
+ ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo("member-1", true), new ServerInfo("member-2", true),
+ new ServerInfo("member-3", true), new ServerInfo("member-4", false),
+ new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
+
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
+
+ String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+ MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+ MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ leaderNode1.configDataStore().waitTillReady();
+ leaderNode1.operDataStore().waitTillReady();
+ verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true),
+ new SimpleEntry<>("member-2", true), new SimpleEntry<>("member-3", true),
+ new SimpleEntry<>("member-4", false), new SimpleEntry<>("member-5", false),
+ new SimpleEntry<>("member-6", false));
+
+ ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
+ leaderNode1.operDataStore());
+
+ RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards().
+ get(10, TimeUnit.SECONDS);
+ FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+ verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+ successShardResult("people", DataStoreType.Config),
+ successShardResult("cars", DataStoreType.Operational),
+ successShardResult("people", DataStoreType.Operational));
+
+ // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
+ verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+ replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+ replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+ new String[]{"cars", "people"},
+ new SimpleEntry<>("member-1", false), new SimpleEntry<>("member-2", false),
+ new SimpleEntry<>("member-3", false), new SimpleEntry<>("member-4", true),
+ new SimpleEntry<>("member-5", true), new SimpleEntry<>("member-6", true));
+
+ // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
+ // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
+ verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
+ assertNotNull("Expected non-null leader Id", raftState.getLeader());
+ assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
+ });
+ }
+
+ private void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
+ String member, String datastoreTypeSuffix, String... shards) {
+ String[] datastoreTypes = {"config_", "oper_"};
+ for(String type: datastoreTypes) {
+ for(String shard: shards) {
+ List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
+ for(ServerInfo info: serverConfig.getServerConfig()) {
+ newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
+ type + datastoreTypeSuffix).toString(), info.isVoting()));
+ }
+
+ String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
+ type + datastoreTypeSuffix).toString();
+ InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
+ InMemoryJournal.addEntry(shardID, 2, new ReplicatedLogImplEntry(0, 1,
+ new ServerConfigurationPayload(newServerInfo)));
+ }
+ }
+ }
+
+ @SafeVarargs
+ private static void verifyVotingStates(DistributedDataStore[] datastores, String[] shards,
+ SimpleEntry<String, Boolean>... expStates) throws Exception {
+ for(DistributedDataStore datastore: datastores) {
+ for(String shard: shards) {
+ verifyVotingStates(datastore, shard, expStates);
+ }
+ }
+ }
+
+ @SafeVarargs
+ private static void verifyVotingStates(DistributedDataStore datastore, String shardName,
+ SimpleEntry<String, Boolean>... expStates) throws Exception {
+ String localMemberName = datastore.getActorContext().getCurrentMemberName().getName();
+ Map<String, Boolean> expStateMap = new HashMap<>();
+ for(Entry<String, Boolean> e: expStates) {
+ expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
+ datastore.getActorContext().getDataStoreName()).toString(), e.getValue());
+ }
+
+ verifyRaftState(datastore, shardName, raftState -> {
+ String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
+ datastore.getActorContext().getDataStoreName()).toString();
+ assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
+ for(Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+ assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
+ }
+ });