2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.admin;
10 import static java.lang.Boolean.FALSE;
11 import static java.lang.Boolean.TRUE;
12 import static java.util.Objects.requireNonNull;
13 import static org.hamcrest.CoreMatchers.anyOf;
14 import static org.hamcrest.CoreMatchers.containsString;
15 import static org.hamcrest.MatcherAssert.assertThat;
16 import static org.junit.Assert.assertEquals;
17 import static org.junit.Assert.assertFalse;
18 import static org.junit.Assert.assertNotNull;
19 import static org.junit.Assert.assertNull;
20 import static org.junit.Assert.assertTrue;
21 import static org.junit.Assert.fail;
22 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
23 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
24 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
26 import akka.actor.ActorRef;
27 import akka.actor.PoisonPill;
28 import akka.actor.Status.Success;
29 import akka.cluster.Cluster;
30 import com.google.common.collect.Lists;
32 import java.nio.file.Files;
33 import java.util.ArrayList;
34 import java.util.HashMap;
35 import java.util.HashSet;
36 import java.util.List;
38 import java.util.Optional;
40 import java.util.concurrent.TimeUnit;
41 import org.apache.commons.lang3.SerializationUtils;
42 import org.junit.After;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.opendaylight.controller.cluster.access.concepts.MemberName;
46 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
47 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
48 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
49 import org.opendaylight.controller.cluster.datastore.MemberNode;
50 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
51 import org.opendaylight.controller.cluster.datastore.Shard;
52 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
53 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
54 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
55 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
56 import org.opendaylight.controller.cluster.raft.RaftState;
57 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
58 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
59 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
60 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
61 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
62 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
63 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInputBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInputBuilder;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultKey;
78 import org.opendaylight.yangtools.yang.common.RpcResult;
79 import org.opendaylight.yangtools.yang.common.XMLNamespace;
80 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
83 * Unit tests for ClusterAdminRpcService.
85 * @author Thomas Pantelis
87 public class ClusterAdminRpcServiceTest {
88 record ExpState(String name, boolean voting) {
94 private static final MemberName MEMBER_1 = MemberName.forName("member-1");
95 private static final MemberName MEMBER_2 = MemberName.forName("member-2");
96 private static final MemberName MEMBER_3 = MemberName.forName("member-3");
97 private final List<MemberNode> memberNodes = new ArrayList<>();
100 public void setUp() {
101 InMemoryJournal.clear();
102 InMemorySnapshotStore.clear();
106 public void tearDown() {
107 for (var member : Lists.reverse(memberNodes)) {
114 public void testBackupDatastore() throws Exception {
115 final var node = MemberNode.builder(memberNodes)
116 .akkaConfig("Member1")
117 .moduleShardsConfig("module-shards-member1.conf")
118 .waitForShardLeader("cars", "people")
119 .testName("testBackupDatastore")
122 final var fileName = "target/testBackupDatastore";
123 final var file = new File(fileName);
126 final var service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null);
128 var rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build())
129 .get(5, TimeUnit.SECONDS);
130 verifySuccessfulRpcResult(rpcResult);
132 try (var fis = Files.newInputStream(file.toPath())) {
133 final List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
134 assertEquals("DatastoreSnapshot size", 2, snapshots.size());
136 final var map = Map.of(
137 snapshots.get(0).getType(), snapshots.get(0),
138 snapshots.get(1).getType(), snapshots.get(1));
139 verifyDatastoreSnapshot(node.configDataStore().getActorUtils().getDataStoreName(),
140 map.get(node.configDataStore().getActorUtils().getDataStoreName()), "cars", "people");
142 new File(fileName).delete();
145 // Test failure by killing a shard.
147 node.configDataStore().getActorUtils().getShardManager().tell(node.datastoreContextBuilder()
148 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
150 final var carsShardActor = node.configDataStore().getActorUtils().findLocalShard("cars").orElseThrow();
151 node.kit().watch(carsShardActor);
152 carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
153 node.kit().expectTerminated(carsShardActor);
155 rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build())
156 .get(5, TimeUnit.SECONDS);
157 assertFalse("isSuccessful", rpcResult.isSuccessful());
158 assertEquals("getErrors", 1, rpcResult.getErrors().size());
161 private static void verifyDatastoreSnapshot(final String type, final DatastoreSnapshot datastoreSnapshot,
162 final String... expShardNames) {
163 assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
164 var shardNames = new HashSet<String>();
165 for (var snapshot : datastoreSnapshot.getShardSnapshots()) {
166 shardNames.add(snapshot.getName());
169 assertEquals("DatastoreSnapshot shard names", Set.of(expShardNames), shardNames);
173 public void testGetPrefixShardRole() throws Exception {
174 String name = "testGetPrefixShardRole";
175 String moduleShardsConfig = "module-shards-default-member-1.conf";
177 final var member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
178 .moduleShardsConfig(moduleShardsConfig).build();
180 member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), "default");
184 public void testModuleShardLeaderMovement() throws Exception {
185 String name = "testModuleShardLeaderMovement";
186 String moduleShardsConfig = "module-shards-member1.conf";
188 final var member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
189 .waitForShardLeader("cars").moduleShardsConfig(moduleShardsConfig).build();
190 final var replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
191 .moduleShardsConfig(moduleShardsConfig).build();
192 final var replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
193 .moduleShardsConfig(moduleShardsConfig).build();
195 member1.waitForMembersUp("member-2", "member-3");
196 replicaNode2.waitForMembersUp("member-1");
197 replicaNode3.waitForMembersUp("member-1", "member-2");
199 doAddShardReplica(replicaNode2, "cars", "member-1");
200 doAddShardReplica(replicaNode3, "cars", "member-1", "member-2");
202 verifyRaftPeersPresent(member1.configDataStore(), "cars", "member-2", "member-3");
204 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
206 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
208 doMakeShardLeaderLocal(member1, "cars", "member-1");
209 verifyRaftState(replicaNode2.configDataStore(), "cars",
210 raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
211 verifyRaftState(replicaNode3.configDataStore(), "cars",
212 raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
214 doMakeShardLeaderLocal(replicaNode2, "cars", "member-2");
215 verifyRaftState(member1.configDataStore(), "cars",
216 raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
217 verifyRaftState(replicaNode3.configDataStore(), "cars",
218 raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
220 replicaNode2.waitForMembersUp("member-3");
221 doMakeShardLeaderLocal(replicaNode3, "cars", "member-3");
225 public void testAddShardReplica() throws Exception {
226 String name = "testAddShardReplica";
227 String moduleShardsConfig = "module-shards-cars-member-1.conf";
228 final var leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
229 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
231 final var newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
232 .moduleShardsConfig(moduleShardsConfig).build();
234 leaderNode1.waitForMembersUp("member-2");
236 doAddShardReplica(newReplicaNode2, "cars", "member-1");
238 var newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
239 .moduleShardsConfig(moduleShardsConfig).build();
241 leaderNode1.waitForMembersUp("member-3");
242 newReplicaNode2.waitForMembersUp("member-3");
244 doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
246 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
247 verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
249 // Write data to member-2's config datastore and read/verify via member-3
250 final var configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
251 newReplicaNode3.configDataStore());
253 // Write data to member-3's oper datastore and read/verify via member-2
254 writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
256 // Verify all data has been replicated. We expect 4 log entries and thus last applied index of 3 -
257 // 2 ServerConfigurationPayload entries, the transaction payload entry plus a purge payload.
259 RaftStateVerifier verifier = raftState -> {
260 assertEquals("Commit index", 3, raftState.getCommitIndex());
261 assertEquals("Last applied index", 3, raftState.getLastApplied());
264 verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
265 verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
267 verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
268 verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
270 verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
271 verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
273 // Restart member-3 and verify the cars config shard is re-instated.
275 Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
276 newReplicaNode3.cleanup();
278 newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
279 .moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
281 verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
282 readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
286 public void testAddShardReplicaFailures() throws Exception {
287 String name = "testAddShardReplicaFailures";
288 final var memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
289 .moduleShardsConfig("module-shards-cars-member-1.conf").build();
291 final var service = new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), null);
293 var rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
294 .setDataStoreType(DataStoreType.Config)
296 .get(10, TimeUnit.SECONDS);
297 verifyFailedRpcResult(rpcResult);
299 rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars").build())
300 .get(10, TimeUnit.SECONDS);
301 verifyFailedRpcResult(rpcResult);
303 rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people")
304 .setDataStoreType(DataStoreType.Config)
306 .get(10, TimeUnit.SECONDS);
307 verifyFailedRpcResult(rpcResult);
310 private static ContainerNode writeCarsNodeAndVerify(final AbstractDataStore writeToStore,
311 final AbstractDataStore readFromStore) throws Exception {
312 final var writeTx = writeToStore.newWriteOnlyTransaction();
313 final var carsNode = CarsModel.create();
314 writeTx.write(CarsModel.BASE_PATH, carsNode);
316 final var cohort = writeTx.ready();
317 assertEquals("canCommit", TRUE, cohort.canCommit().get(7, TimeUnit.SECONDS));
318 cohort.preCommit().get(5, TimeUnit.SECONDS);
319 cohort.commit().get(5, TimeUnit.SECONDS);
321 readCarsNodeAndVerify(readFromStore, carsNode);
325 private static void readCarsNodeAndVerify(final AbstractDataStore readFromStore,
326 final ContainerNode expCarsNode) throws Exception {
327 assertEquals(Optional.of(expCarsNode),
328 readFromStore.newReadOnlyTransaction().read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS));
331 private static void doAddShardReplica(final MemberNode memberNode, final String shardName,
332 final String... peerMemberNames) throws Exception {
333 memberNode.waitForMembersUp(peerMemberNames);
335 final var service = new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), null);
337 var rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
338 .setShardName(shardName)
339 .setDataStoreType(DataStoreType.Config)
340 .build()).get(10, TimeUnit.SECONDS);
341 verifySuccessfulRpcResult(rpcResult);
343 verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
345 assertEquals(Optional.empty(), memberNode.operDataStore().getActorUtils().findLocalShard(shardName));
347 rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
348 .setShardName(shardName)
349 .setDataStoreType(DataStoreType.Operational)
350 .build()).get(10, TimeUnit.SECONDS);
351 verifySuccessfulRpcResult(rpcResult);
353 verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
356 private static void doMakeShardLeaderLocal(final MemberNode memberNode, final String shardName,
357 final String newLeader) throws Exception {
358 final var service = new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), null);
360 final var rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder()
361 .setDataStoreType(DataStoreType.Config)
362 .setShardName(shardName)
363 .build()).get(10, TimeUnit.SECONDS);
365 verifySuccessfulRpcResult(rpcResult);
367 verifyRaftState(memberNode.configDataStore(), shardName, raftState -> assertThat(raftState.getLeader(),
368 containsString(newLeader)));
371 private static <T> T verifySuccessfulRpcResult(final RpcResult<T> rpcResult) {
372 if (!rpcResult.isSuccessful()) {
373 final var errors = rpcResult.getErrors();
374 if (errors.size() > 0) {
375 final var error = errors.get(0);
376 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
379 fail("Rpc failed with no error");
382 return rpcResult.getResult();
385 private static void verifyFailedRpcResult(final RpcResult<?> rpcResult) {
386 assertFalse("RpcResult", rpcResult.isSuccessful());
387 final var errors = rpcResult.getErrors();
388 assertEquals("RpcResult errors size", 1, errors.size());
389 final var error = errors.get(0);
390 assertNotNull("RpcResult error message null", error.getMessage());
394 public void testRemoveShardReplica() throws Exception {
395 String name = "testRemoveShardReplica";
396 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
397 final var leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
398 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
399 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
402 final var replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
403 .moduleShardsConfig(moduleShardsConfig).build();
405 final var replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
406 .moduleShardsConfig(moduleShardsConfig).build();
408 leaderNode1.configDataStore().waitTillReady();
409 replicaNode3.configDataStore().waitTillReady();
410 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
411 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
412 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
414 // Invoke RPC service on member-3 to remove it's local shard
416 final var service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), replicaNode3.operDataStore(),
419 var rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder()
420 .setShardName("cars").setMemberName("member-3")
421 .setDataStoreType(DataStoreType.Config)
422 .build()).get(10, TimeUnit.SECONDS);
423 verifySuccessfulRpcResult(rpcResult);
425 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
426 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
427 verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
429 // Restart member-2 and verify member-3 isn't present.
431 Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
432 replicaNode2.cleanup();
434 final var newPeplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
435 .moduleShardsConfig(moduleShardsConfig).build();
437 newPeplicaNode2.configDataStore().waitTillReady();
438 verifyRaftPeersPresent(newPeplicaNode2.configDataStore(), "cars", "member-1");
440 // Invoke RPC service on member-1 to remove member-2
442 final var service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), leaderNode1.operDataStore(),
445 rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
446 .setShardName("cars")
447 .setMemberName("member-2")
448 .setDataStoreType(DataStoreType.Config)
449 .build()).get(10, TimeUnit.SECONDS);
450 verifySuccessfulRpcResult(rpcResult);
452 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
453 verifyNoShardPresent(newPeplicaNode2.configDataStore(), "cars");
457 public void testRemoveShardLeaderReplica() throws Exception {
458 String name = "testRemoveShardLeaderReplica";
459 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
460 final var leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
461 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
462 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
465 final var replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
466 .moduleShardsConfig(moduleShardsConfig).build();
468 final var replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
469 .moduleShardsConfig(moduleShardsConfig).build();
471 leaderNode1.configDataStore().waitTillReady();
472 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
473 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
474 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
476 replicaNode2.waitForMembersUp("member-1", "member-3");
477 replicaNode3.waitForMembersUp("member-1", "member-2");
479 // Invoke RPC service on leader member-1 to remove it's local shard
481 final var service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), leaderNode1.operDataStore(),
484 final var rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
485 .setShardName("cars")
486 .setMemberName("member-1")
487 .setDataStoreType(DataStoreType.Config)
488 .build()).get(10, TimeUnit.SECONDS);
489 verifySuccessfulRpcResult(rpcResult);
491 verifyRaftState(replicaNode2.configDataStore(), "cars", raftState ->
492 assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
493 containsString("member-3"))));
495 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
496 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
497 verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
501 public void testAddReplicasForAllShards() throws Exception {
502 String name = "testAddReplicasForAllShards";
503 String moduleShardsConfig = "module-shards-member1.conf";
504 final var leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
505 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
507 final var petsModuleConfig = new ModuleShardConfiguration(XMLNamespace.of("pets-ns"), "pets-module", "pets",
508 null, List.of(MEMBER_1));
509 leaderNode1.configDataStore().getActorUtils().getShardManager().tell(
510 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
511 leaderNode1.kit().expectMsgClass(Success.class);
512 leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorUtils(), "pets");
514 final var newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
515 .moduleShardsConfig(moduleShardsConfig).build();
517 leaderNode1.waitForMembersUp("member-2");
518 newReplicaNode2.waitForMembersUp("member-1");
520 newReplicaNode2.configDataStore().getActorUtils().getShardManager().tell(
521 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
522 newReplicaNode2.kit().expectMsgClass(Success.class);
524 newReplicaNode2.operDataStore().getActorUtils().getShardManager()
525 .tell(new CreateShard(new ModuleShardConfiguration(XMLNamespace.of("no-leader-ns"), "no-leader-module",
526 "no-leader", null, List.of(MEMBER_1)),
527 Shard.builder(), null), newReplicaNode2.kit().getRef());
528 newReplicaNode2.kit().expectMsgClass(Success.class);
530 final var service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
531 newReplicaNode2.operDataStore(), null);
533 var rpcResult = service.addReplicasForAllShards(new AddReplicasForAllShardsInputBuilder().build())
534 .get(10, TimeUnit.SECONDS);
535 final var result = verifySuccessfulRpcResult(rpcResult);
536 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
537 successShardResult("people", DataStoreType.Config),
538 successShardResult("pets", DataStoreType.Config),
539 successShardResult("cars", DataStoreType.Operational),
540 successShardResult("people", DataStoreType.Operational),
541 failedShardResult("no-leader", DataStoreType.Operational));
543 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
544 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
545 verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
546 verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
547 verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
551 public void testRemoveAllShardReplicas() throws Exception {
552 String name = "testRemoveAllShardReplicas";
553 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
554 final var leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
555 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
556 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
559 final var replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
560 .moduleShardsConfig(moduleShardsConfig).build();
562 final var replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
563 .moduleShardsConfig(moduleShardsConfig).build();
565 leaderNode1.configDataStore().waitTillReady();
566 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
567 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
568 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
570 final var petsModuleConfig = new ModuleShardConfiguration(XMLNamespace.of("pets-ns"), "pets-module", "pets",
571 null, List.of(MEMBER_1, MEMBER_2, MEMBER_3));
572 leaderNode1.configDataStore().getActorUtils().getShardManager().tell(
573 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
574 leaderNode1.kit().expectMsgClass(Success.class);
576 replicaNode2.configDataStore().getActorUtils().getShardManager().tell(
577 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef());
578 replicaNode2.kit().expectMsgClass(Success.class);
580 replicaNode3.configDataStore().getActorUtils().getShardManager().tell(
581 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef());
582 replicaNode3.kit().expectMsgClass(Success.class);
584 verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3");
585 verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
586 verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
588 final var service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), replicaNode3.operDataStore(),
591 var rpcResult = service3.removeAllShardReplicas(
592 new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build())
593 .get(10, TimeUnit.SECONDS);
594 final var result = verifySuccessfulRpcResult(rpcResult);
595 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
596 successShardResult("people", DataStoreType.Config),
597 successShardResult("pets", DataStoreType.Config),
598 successShardResult("cars", DataStoreType.Operational),
599 successShardResult("people", DataStoreType.Operational));
601 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
602 verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2");
603 verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2");
604 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
605 verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1");
606 verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1");
607 verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
608 verifyNoShardPresent(replicaNode3.configDataStore(), "people");
609 verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
613 public void testChangeMemberVotingStatesForShard() throws Exception {
614 String name = "testChangeMemberVotingStatusForShard";
615 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
616 final var leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
617 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
618 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
621 final var replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
622 .moduleShardsConfig(moduleShardsConfig).build();
624 final var replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
625 .moduleShardsConfig(moduleShardsConfig).build();
627 leaderNode1.configDataStore().waitTillReady();
628 replicaNode3.configDataStore().waitTillReady();
629 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
630 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
631 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
633 // Invoke RPC service on member-3 to change voting status
635 final var service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), replicaNode3.operDataStore(),
638 var rpcResult = service3.changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
639 .setShardName("cars").setDataStoreType(DataStoreType.Config)
640 .setMemberVotingState(List.of(
641 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
642 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build()))
644 .get(10, TimeUnit.SECONDS);
645 verifySuccessfulRpcResult(rpcResult);
647 verifyVotingStates(leaderNode1.configDataStore(), "cars",
648 new ExpState("member-1", true), new ExpState("member-2", false), new ExpState("member-3", false));
649 verifyVotingStates(replicaNode2.configDataStore(), "cars",
650 new ExpState("member-1", true), new ExpState("member-2", false), new ExpState("member-3", false));
651 verifyVotingStates(replicaNode3.configDataStore(), "cars",
652 new ExpState("member-1", true), new ExpState("member-2", false), new ExpState("member-3", false));
656 public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
657 String name = "testChangeMemberVotingStatesForSingleNodeShard";
658 String moduleShardsConfig = "module-shards-member1.conf";
659 final var leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
660 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
661 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
664 leaderNode.configDataStore().waitTillReady();
666 // Invoke RPC service on member-3 to change voting status
668 final var service = new ClusterAdminRpcService(leaderNode.configDataStore(), leaderNode.operDataStore(), null);
670 final var rpcResult = service.changeMemberVotingStatesForShard(
671 new ChangeMemberVotingStatesForShardInputBuilder()
672 .setShardName("cars").setDataStoreType(DataStoreType.Config)
673 .setMemberVotingState(List.of(new MemberVotingStateBuilder()
674 .setMemberName("member-1")
678 .get(10, TimeUnit.SECONDS);
679 verifyFailedRpcResult(rpcResult);
681 verifyVotingStates(leaderNode.configDataStore(), "cars", new ExpState("member-1", true));
685 public void testChangeMemberVotingStatesForAllShards() throws Exception {
686 String name = "testChangeMemberVotingStatesForAllShards";
687 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
688 final var leaderNode1 = MemberNode.builder(memberNodes)
689 .akkaConfig("Member1")
691 .moduleShardsConfig(moduleShardsConfig)
692 .datastoreContextBuilder(DatastoreContext.newBuilder()
693 .shardHeartbeatIntervalInMillis(300)
694 .shardElectionTimeoutFactor(1))
697 final var replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
698 .moduleShardsConfig(moduleShardsConfig).build();
700 final var replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
701 .moduleShardsConfig(moduleShardsConfig).build();
703 leaderNode1.configDataStore().waitTillReady();
704 leaderNode1.operDataStore().waitTillReady();
705 replicaNode3.configDataStore().waitTillReady();
706 replicaNode3.operDataStore().waitTillReady();
707 verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
708 verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
709 verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
711 // Invoke RPC service on member-3 to change voting status
713 final var service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
714 replicaNode3.operDataStore(), null);
716 final var rpcResult = service3.changeMemberVotingStatesForAllShards(
717 new ChangeMemberVotingStatesForAllShardsInputBuilder()
718 .setMemberVotingState(List.of(
719 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
720 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build()))
722 .get(10, TimeUnit.SECONDS);
723 final var result = verifySuccessfulRpcResult(rpcResult);
724 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
725 successShardResult("people", DataStoreType.Config),
726 successShardResult("cars", DataStoreType.Operational),
727 successShardResult("people", DataStoreType.Operational));
729 verifyVotingStates(new ClientBackedDataStore[] {
730 leaderNode1.configDataStore(), leaderNode1.operDataStore(),
731 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
732 replicaNode3.configDataStore(), replicaNode3.operDataStore()
733 }, new String[] { "cars", "people" },
734 new ExpState("member-1", true), new ExpState("member-2", false), new ExpState("member-3", false));
738 public void testFlipMemberVotingStates() throws Exception {
739 String name = "testFlipMemberVotingStates";
741 final var persistedServerConfig = new ServerConfigurationPayload(List.of(
742 new ServerInfo("member-1", true), new ServerInfo("member-2", true), new ServerInfo("member-3", false)));
744 setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
745 setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
746 setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
748 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
749 final var leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
750 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder()
751 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10))
754 final var replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
755 .moduleShardsConfig(moduleShardsConfig).build();
757 final var replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
758 .moduleShardsConfig(moduleShardsConfig).build();
760 leaderNode1.configDataStore().waitTillReady();
761 leaderNode1.operDataStore().waitTillReady();
762 replicaNode3.configDataStore().waitTillReady();
763 replicaNode3.operDataStore().waitTillReady();
764 verifyVotingStates(leaderNode1.configDataStore(), "cars",
765 new ExpState("member-1", true), new ExpState("member-2", true), new ExpState("member-3", false));
767 final var service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), replicaNode3.operDataStore(),
770 var rpcResult = service3.flipMemberVotingStatesForAllShards(
771 new FlipMemberVotingStatesForAllShardsInputBuilder().build())
772 .get(10, TimeUnit.SECONDS);
773 var result = verifySuccessfulRpcResult(rpcResult);
774 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
775 successShardResult("people", DataStoreType.Config),
776 successShardResult("cars", DataStoreType.Operational),
777 successShardResult("people", DataStoreType.Operational));
779 verifyVotingStates(new ClientBackedDataStore[] {
780 leaderNode1.configDataStore(), leaderNode1.operDataStore(),
781 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
782 replicaNode3.configDataStore(), replicaNode3.operDataStore()
783 }, new String[] { "cars", "people" },
784 new ExpState("member-1", false), new ExpState("member-2", false), new ExpState("member-3", true));
786 // Leadership should have transferred to member 3 since it is the only remaining voting member.
787 verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
788 assertNotNull("Expected non-null leader Id", raftState.getLeader());
789 assertTrue("Expected leader member-3. Actual: " + raftState.getLeader(),
790 raftState.getLeader().contains("member-3"));
793 verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
794 assertNotNull("Expected non-null leader Id", raftState.getLeader());
795 assertTrue("Expected leader member-3. Actual: " + raftState.getLeader(),
796 raftState.getLeader().contains("member-3"));
799 // Flip the voting states back to the original states.
801 rpcResult = service3.flipMemberVotingStatesForAllShards(
802 new FlipMemberVotingStatesForAllShardsInputBuilder().build())
803 .get(10, TimeUnit.SECONDS);
804 result = verifySuccessfulRpcResult(rpcResult);
805 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
806 successShardResult("people", DataStoreType.Config),
807 successShardResult("cars", DataStoreType.Operational),
808 successShardResult("people", DataStoreType.Operational));
810 verifyVotingStates(new ClientBackedDataStore[] {
811 leaderNode1.configDataStore(), leaderNode1.operDataStore(),
812 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
813 replicaNode3.configDataStore(), replicaNode3.operDataStore()
814 }, new String[] { "cars", "people" },
815 new ExpState("member-1", true), new ExpState("member-2", true), new ExpState("member-3", false));
817 // Leadership should have transferred to member 1 or 2.
818 verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
819 assertNotNull("Expected non-null leader Id", raftState.getLeader());
820 assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
821 raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
826 public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
827 String name = "testFlipMemberVotingStatesWithNoInitialLeader";
829 // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
830 // non-voting and simulated as down by not starting them up.
831 final var persistedServerConfig = new ServerConfigurationPayload(List.of(
832 new ServerInfo("member-1", false), new ServerInfo("member-2", false),
833 new ServerInfo("member-3", false), new ServerInfo("member-4", true),
834 new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
836 setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
837 setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
838 setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
840 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
841 final var replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
842 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
843 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
846 final var replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
847 .moduleShardsConfig(moduleShardsConfig).build();
849 final var replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
850 .moduleShardsConfig(moduleShardsConfig).build();
852 // Initially there won't be a leader b/c all the up nodes are non-voting.
854 replicaNode1.waitForMembersUp("member-2", "member-3");
856 verifyVotingStates(replicaNode1.configDataStore(), "cars",
857 new ExpState("member-1", false), new ExpState("member-2", false), new ExpState("member-3", false),
858 new ExpState("member-4", true), new ExpState("member-5", true), new ExpState("member-6", true));
860 verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
861 assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
863 final var service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(), replicaNode1.operDataStore(),
866 final var rpcResult = service1.flipMemberVotingStatesForAllShards(
867 new FlipMemberVotingStatesForAllShardsInputBuilder().build())
868 .get(10, TimeUnit.SECONDS);
869 final var result = verifySuccessfulRpcResult(rpcResult);
870 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
871 successShardResult("people", DataStoreType.Config),
872 successShardResult("cars", DataStoreType.Operational),
873 successShardResult("people", DataStoreType.Operational));
875 verifyVotingStates(new ClientBackedDataStore[] {
876 replicaNode1.configDataStore(), replicaNode1.operDataStore(),
877 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
878 replicaNode3.configDataStore(), replicaNode3.operDataStore()
879 }, new String[] { "cars", "people" },
880 new ExpState("member-1", true), new ExpState("member-2", true), new ExpState("member-3", true),
881 new ExpState("member-4", false), new ExpState("member-5", false), new ExpState("member-6", false));
883 // Since member 1 was changed to voting and there was no leader, it should've started and election
885 verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
886 assertNotNull("Expected non-null leader Id", raftState.getLeader());
887 assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
888 raftState.getLeader().contains("member-1"));
891 verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
892 assertNotNull("Expected non-null leader Id", raftState.getLeader());
893 assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
894 raftState.getLeader().contains("member-1"));
899 public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
900 String name = "testFlipMemberVotingStatesWithVotingMembersDown";
902 // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
903 final var persistedServerConfig = new ServerConfigurationPayload(List.of(
904 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
905 new ServerInfo("member-3", true), new ServerInfo("member-4", false),
906 new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
908 setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
909 setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
910 setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
912 String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
913 final var leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
914 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
915 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
918 final var replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
919 .moduleShardsConfig(moduleShardsConfig).build();
921 final var replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
922 .moduleShardsConfig(moduleShardsConfig).build();
924 leaderNode1.configDataStore().waitTillReady();
925 leaderNode1.operDataStore().waitTillReady();
926 verifyVotingStates(leaderNode1.configDataStore(), "cars",
927 new ExpState("member-1", true), new ExpState("member-2", true), new ExpState("member-3", true),
928 new ExpState("member-4", false), new ExpState("member-5", false), new ExpState("member-6", false));
930 final var service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), leaderNode1.operDataStore(),
933 final var rpcResult = service1.flipMemberVotingStatesForAllShards(
934 new FlipMemberVotingStatesForAllShardsInputBuilder().build())
935 .get(10, TimeUnit.SECONDS);
936 final var result = verifySuccessfulRpcResult(rpcResult);
937 verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
938 successShardResult("people", DataStoreType.Config),
939 successShardResult("cars", DataStoreType.Operational),
940 successShardResult("people", DataStoreType.Operational));
942 // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
943 verifyVotingStates(new ClientBackedDataStore[] {
944 leaderNode1.configDataStore(), leaderNode1.operDataStore(),
945 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
946 replicaNode3.configDataStore(), replicaNode3.operDataStore()
947 }, new String[] { "cars", "people" },
948 new ExpState("member-1", false), new ExpState("member-2", false), new ExpState("member-3", false),
949 new ExpState("member-4", true), new ExpState("member-5", true), new ExpState("member-6", true));
951 // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
952 // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
953 verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
954 assertNotNull("Expected non-null leader Id", raftState.getLeader());
955 assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
959 private static void setupPersistedServerConfigPayload(final ServerConfigurationPayload serverConfig,
960 final String member, final String datastoreTypeSuffix, final String... shards) {
961 String[] datastoreTypes = { "config_", "oper_" };
962 for (String type : datastoreTypes) {
963 for (String shard : shards) {
964 final var newServerInfo = new ArrayList<ServerInfo>(serverConfig.getServerConfig().size());
965 for (var info : serverConfig.getServerConfig()) {
966 newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.peerId()),
967 type + datastoreTypeSuffix).toString(), info.isVoting()));
970 final String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
971 type + datastoreTypeSuffix).toString();
972 InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
973 InMemoryJournal.addEntry(shardID, 2, new SimpleReplicatedLogEntry(0, 1,
974 new ServerConfigurationPayload(newServerInfo)));
979 private static void verifyVotingStates(final ClientBackedDataStore[] datastores, final String[] shards,
980 final ExpState... expStates) throws Exception {
981 for (var datastore : datastores) {
982 for (String shard : shards) {
983 verifyVotingStates(datastore, shard, expStates);
988 private static void verifyVotingStates(final ClientBackedDataStore datastore, final String shardName,
989 final ExpState... expStates) throws Exception {
990 String localMemberName = datastore.getActorUtils().getCurrentMemberName().getName();
991 var expStateMap = new HashMap<String, Boolean>();
992 for (var expState : expStates) {
993 expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(expState.name),
994 datastore.getActorUtils().getDataStoreName()).toString(), expState.voting);
997 verifyRaftState(datastore, shardName, raftState -> {
998 String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
999 datastore.getActorUtils().getDataStoreName()).toString();
1000 assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
1001 for (var entry : raftState.getPeerVotingStates().entrySet()) {
1002 assertEquals("Voting state for " + entry.getKey(), expStateMap.get(entry.getKey()), entry.getValue());
1007 private static void verifyShardResults(final Map<ShardResultKey, ShardResult> shardResults,
1008 final ShardResult... expShardResults) {
1009 var expResultsMap = new HashMap<String, ShardResult>();
1010 for (var r : expShardResults) {
1011 expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
1014 for (var result : shardResults.values()) {
1015 var exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
1016 assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
1017 result.getDataStoreType()), exp);
1018 assertEquals("isSucceeded", exp.getSucceeded(), result.getSucceeded());
1019 if (exp.getSucceeded()) {
1020 assertNull("Expected null error message", result.getErrorMessage());
1022 assertNotNull("Expected error message", result.getErrorMessage());
1026 if (!expResultsMap.isEmpty()) {
1027 fail("Missing shard results for " + expResultsMap.keySet());
1031 private static ShardResult successShardResult(final String shardName, final DataStoreType type) {
1032 return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(TRUE).build();
1035 private static ShardResult failedShardResult(final String shardName, final DataStoreType type) {
1036 return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(FALSE).build();