import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
import akka.actor.Status.Success;
import akka.cluster.Cluster;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.FileInputStream;
import java.net.URI;
+import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
+import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
import org.opendaylight.yangtools.yang.common.RpcError;
get(5, TimeUnit.SECONDS);
assertEquals("isSuccessful", false, rpcResult.isSuccessful());
assertEquals("getErrors", 1, rpcResult.getErrors().size());
-
- service.close();
}
private static void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, String... expShardNames) {
rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people").
setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
verifyFailedRpcResult(rpcResult);
-
- service.close();
}
private static NormalizedNode<?, ?> writeCarsNodeAndVerify(DistributedDataStore writeToStore,
verifySuccessfulRpcResult(rpcResult);
verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
-
- service.close();
}
private static <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build()).
get(10, TimeUnit.SECONDS);
verifySuccessfulRpcResult(rpcResult);
- service3.close();
verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
setShardName("cars").setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).
get(10, TimeUnit.SECONDS);
verifySuccessfulRpcResult(rpcResult);
- service1.close();
verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
verifyNoShardPresent(replicaNode2.configDataStore(), "cars");
setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()).
get(10, TimeUnit.SECONDS);
verifySuccessfulRpcResult(rpcResult);
- service1.close();
verifyRaftState(replicaNode2.configDataStore(), "cars", new RaftStateVerifier() {
@Override
verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
-
- service.close();
}
@Test
verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
verifyNoShardPresent(replicaNode3.configDataStore(), "people");
verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
+ }
+
+ @Test
+ public void testChangeMemberVotingStatesForShard() throws Exception {
+ String name = "testChangeMemberVotingStatusForShard";
+ String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+ MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+ MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ leaderNode1.configDataStore().waitTillReady();
+ verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
+ verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
+ verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
- service3.close();
+ // Invoke RPC service on member-3 to change voting status
+
+ ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+ replicaNode3.operDataStore());
+
+ RpcResult<Void> rpcResult = service3.changeMemberVotingStatesForShard(
+ new ChangeMemberVotingStatesForShardInputBuilder().setShardName("cars").
+ setDataStoreType(DataStoreType.Config).setMemberVotingState(ImmutableList.of(
+ new MemberVotingStateBuilder().setMemberName("member-2").setVoting(false).build(),
+ new MemberVotingStateBuilder().setMemberName("member-3").setVoting(false).build())).build()).
+ get(10, TimeUnit.SECONDS);
+ verifySuccessfulRpcResult(rpcResult);
+
+ verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+ new SimpleEntry<>("member-3", false));
+ verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+ new SimpleEntry<>("member-3", false));
+ verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+ new SimpleEntry<>("member-3", false));
}
@Test
- public void testConvertMembersToVotingForAllShards() {
- // TODO implement
+ public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
+ String name = "testChangeMemberVotingStatesForSingleNodeShard";
+ String moduleShardsConfig = "module-shards-member1.conf";
+ MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+ leaderNode.configDataStore().waitTillReady();
+
+ // Invoke RPC service on member-3 to change voting status
+
+ ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
+ leaderNode.operDataStore());
+
+ RpcResult<Void> rpcResult = service.changeMemberVotingStatesForShard(
+ new ChangeMemberVotingStatesForShardInputBuilder().setShardName("cars").
+ setDataStoreType(DataStoreType.Config).setMemberVotingState(ImmutableList.of(
+ new MemberVotingStateBuilder().setMemberName("member-1").setVoting(false).build())).build()).
+ get(10, TimeUnit.SECONDS);
+ verifyFailedRpcResult(rpcResult);
+
+ verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", true));
+ }
+
+ @Test
+ public void testChangeMemberVotingStatesForAllShards() throws Exception {
+ String name = "testChangeMemberVotingStatesForAllShards";
+ String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+ MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+ MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ leaderNode1.configDataStore().waitTillReady();
+ leaderNode1.operDataStore().waitTillReady();
+ verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
+ verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
+ verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
+
+ // Invoke RPC service on member-3 to change voting status
+
+ ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+ replicaNode3.operDataStore());
+
+ RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
+ new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
+ new MemberVotingStateBuilder().setMemberName("member-2").setVoting(false).build(),
+ new MemberVotingStateBuilder().setMemberName("member-3").setVoting(false).build())).build()).
+ get(10, TimeUnit.SECONDS);
+ ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+ verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+ successShardResult("people", DataStoreType.Config),
+ successShardResult("cars", DataStoreType.Operational),
+ successShardResult("people", DataStoreType.Operational));
+
+ verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+ replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+ replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+ new String[]{"cars", "people"}, new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+ new SimpleEntry<>("member-3", false));
}
@Test
- public void testConvertMembersToNonvotingForAllShards() {
- // TODO implement
+ public void testFlipMemberVotingStates() throws Exception {
+ String name = "testFlipMemberVotingStates";
+
+ ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo("member-1", true), new ServerInfo("member-2", true),
+ new ServerInfo("member-3", false)));
+
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
+
+ String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+ MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+ MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ leaderNode1.configDataStore().waitTillReady();
+ leaderNode1.operDataStore().waitTillReady();
+ verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true),
+ new SimpleEntry<>("member-2", true), new SimpleEntry<>("member-3", false));
+
+ ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+ replicaNode3.operDataStore());
+
+ RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards().
+ get(10, TimeUnit.SECONDS);
+ FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+ verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+ successShardResult("people", DataStoreType.Config),
+ successShardResult("cars", DataStoreType.Operational),
+ successShardResult("people", DataStoreType.Operational));
+
+ verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+ replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+ replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+ new String[]{"cars", "people"},
+ new SimpleEntry<>("member-1", false), new SimpleEntry<>("member-2", false),
+ new SimpleEntry<>("member-3", true));
+
+ // Leadership should have transferred to member 3 since it is the only remaining voting member.
+ verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
+ assertNotNull("Expected non-null leader Id", raftState.getLeader());
+ assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+ raftState.getLeader().contains("member-3"));
+ });
+
+ verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
+ assertNotNull("Expected non-null leader Id", raftState.getLeader());
+ assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+ raftState.getLeader().contains("member-3"));
+ });
+
+ // Flip the voting states back to the original states.
+
+ rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS);
+ result = verifySuccessfulRpcResult(rpcResult);
+ verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+ successShardResult("people", DataStoreType.Config),
+ successShardResult("cars", DataStoreType.Operational),
+ successShardResult("people", DataStoreType.Operational));
+
+ verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+ replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+ replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+ new String[]{"cars", "people"},
+ new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", true),
+ new SimpleEntry<>("member-3", false));
+
+ // Leadership should have transferred to member 1 or 2.
+ verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
+ assertNotNull("Expected non-null leader Id", raftState.getLeader());
+ assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
+ raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
+ });
+ }
+
+ @Test
+ public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
+ String name = "testFlipMemberVotingStatesWithNoInitialLeader";
+
+ // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
+ // non-voting and simulated as down by not starting them up.
+ ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo("member-1", false), new ServerInfo("member-2", false),
+ new ServerInfo("member-3", false), new ServerInfo("member-4", true),
+ new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
+
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
+
+ String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+ MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+ MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ // Initially there won't be a leader b/c all the up nodes are non-voting.
+
+ replicaNode1.waitForMembersUp("member-2", "member-3");
+
+ verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", false),
+ new SimpleEntry<>("member-2", false), new SimpleEntry<>("member-3", false),
+ new SimpleEntry<>("member-4", true), new SimpleEntry<>("member-5", true),
+ new SimpleEntry<>("member-6", true));
+
+ verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
+ assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
+
+ ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
+ replicaNode1.operDataStore());
+
+ RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards().
+ get(10, TimeUnit.SECONDS);
+ FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+ verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+ successShardResult("people", DataStoreType.Config),
+ successShardResult("cars", DataStoreType.Operational),
+ successShardResult("people", DataStoreType.Operational));
+
+ verifyVotingStates(new DistributedDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
+ replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+ replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+ new String[]{"cars", "people"},
+ new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", true),
+ new SimpleEntry<>("member-3", true), new SimpleEntry<>("member-4", false),
+ new SimpleEntry<>("member-5", false), new SimpleEntry<>("member-6", false));
+
+ // Since member 1 was changed to voting and there was no leader, it should've started and election
+ // and become leader
+ verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
+ assertNotNull("Expected non-null leader Id", raftState.getLeader());
+ assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+ raftState.getLeader().contains("member-1"));
+ });
+
+ verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
+ assertNotNull("Expected non-null leader Id", raftState.getLeader());
+ assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+ raftState.getLeader().contains("member-1"));
+ });
+ }
+
+ @Test
+ public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
+ String name = "testFlipMemberVotingStatesWithVotingMembersDown";
+
+ // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
+ ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo("member-1", true), new ServerInfo("member-2", true),
+ new ServerInfo("member-3", true), new ServerInfo("member-4", false),
+ new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
+
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
+
+ String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+ MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+ MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ leaderNode1.configDataStore().waitTillReady();
+ leaderNode1.operDataStore().waitTillReady();
+ verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true),
+ new SimpleEntry<>("member-2", true), new SimpleEntry<>("member-3", true),
+ new SimpleEntry<>("member-4", false), new SimpleEntry<>("member-5", false),
+ new SimpleEntry<>("member-6", false));
+
+ ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
+ leaderNode1.operDataStore());
+
+ RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards().
+ get(10, TimeUnit.SECONDS);
+ FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+ verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+ successShardResult("people", DataStoreType.Config),
+ successShardResult("cars", DataStoreType.Operational),
+ successShardResult("people", DataStoreType.Operational));
+
+ // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
+ verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+ replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+ replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+ new String[]{"cars", "people"},
+ new SimpleEntry<>("member-1", false), new SimpleEntry<>("member-2", false),
+ new SimpleEntry<>("member-3", false), new SimpleEntry<>("member-4", true),
+ new SimpleEntry<>("member-5", true), new SimpleEntry<>("member-6", true));
+
+ // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
+ // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
+ verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
+ assertNotNull("Expected non-null leader Id", raftState.getLeader());
+ assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
+ });
+ }
+
+ private void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
+ String member, String datastoreTypeSuffix, String... shards) {
+ String[] datastoreTypes = {"config_", "oper_"};
+ for(String type: datastoreTypes) {
+ for(String shard: shards) {
+ List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
+ for(ServerInfo info: serverConfig.getServerConfig()) {
+ newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
+ type + datastoreTypeSuffix).toString(), info.isVoting()));
+ }
+
+ String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
+ type + datastoreTypeSuffix).toString();
+ InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
+ InMemoryJournal.addEntry(shardID, 2, new ReplicatedLogImplEntry(0, 1,
+ new ServerConfigurationPayload(newServerInfo)));
+ }
+ }
+ }
+
+ @SafeVarargs
+ private static void verifyVotingStates(DistributedDataStore[] datastores, String[] shards,
+ SimpleEntry<String, Boolean>... expStates) throws Exception {
+ for(DistributedDataStore datastore: datastores) {
+ for(String shard: shards) {
+ verifyVotingStates(datastore, shard, expStates);
+ }
+ }
+ }
+
+ @SafeVarargs
+ private static void verifyVotingStates(DistributedDataStore datastore, String shardName,
+ SimpleEntry<String, Boolean>... expStates) throws Exception {
+ String localMemberName = datastore.getActorContext().getCurrentMemberName().getName();
+ Map<String, Boolean> expStateMap = new HashMap<>();
+ for(Entry<String, Boolean> e: expStates) {
+ expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
+ datastore.getActorContext().getDataStoreName()).toString(), e.getValue());
+ }
+
+ verifyRaftState(datastore, shardName, raftState -> {
+ String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
+ datastore.getActorContext().getDataStoreName()).toString();
+ assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
+ for(Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+ assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
+ }
+ });
}
private static void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {