Add cluster-admin api for datacenter activation
[controller.git] / opendaylight / md-sal / sal-cluster-admin-impl / src / test / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcServiceTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.admin;
9
10 import static java.lang.Boolean.FALSE;
11 import static java.lang.Boolean.TRUE;
12 import static org.hamcrest.CoreMatchers.anyOf;
13 import static org.hamcrest.CoreMatchers.containsString;
14 import static org.hamcrest.MatcherAssert.assertThat;
15 import static org.junit.Assert.assertEquals;
16 import static org.junit.Assert.assertFalse;
17 import static org.junit.Assert.assertNotNull;
18 import static org.junit.Assert.assertNull;
19 import static org.junit.Assert.assertTrue;
20 import static org.junit.Assert.fail;
21 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
22 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
23 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
24
25 import akka.actor.ActorRef;
26 import akka.actor.PoisonPill;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.collect.Iterables;
31 import com.google.common.collect.Lists;
32 import java.io.File;
33 import java.io.FileInputStream;
34 import java.util.AbstractMap.SimpleEntry;
35 import java.util.ArrayList;
36 import java.util.HashMap;
37 import java.util.HashSet;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Optional;
42 import java.util.Set;
43 import java.util.concurrent.TimeUnit;
44 import org.apache.commons.lang3.SerializationUtils;
45 import org.junit.After;
46 import org.junit.Before;
47 import org.junit.Test;
48 import org.opendaylight.controller.cluster.access.concepts.MemberName;
49 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
50 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
51 import org.opendaylight.controller.cluster.datastore.MemberNode;
52 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
53 import org.opendaylight.controller.cluster.datastore.Shard;
54 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
55 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
56 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
57 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
58 import org.opendaylight.controller.cluster.raft.RaftState;
59 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
60 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
61 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
62 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
63 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
64 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
65 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
66 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
67 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInputBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaOutput;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreOutput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutput;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInputBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutput;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultKey;
91 import org.opendaylight.yangtools.yang.common.RpcError;
92 import org.opendaylight.yangtools.yang.common.RpcResult;
93 import org.opendaylight.yangtools.yang.common.XMLNamespace;
94 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
95
96 /**
97  * Unit tests for ClusterAdminRpcService.
98  *
99  * @author Thomas Pantelis
100  */
101 public class ClusterAdminRpcServiceTest {
102     private static final MemberName MEMBER_1 = MemberName.forName("member-1");
103     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
104     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
105     private final List<MemberNode> memberNodes = new ArrayList<>();
106
107     @Before
108     public void setUp() {
109         InMemoryJournal.clear();
110         InMemorySnapshotStore.clear();
111     }
112
113     @After
114     public void tearDown() {
115         for (MemberNode m : Lists.reverse(memberNodes)) {
116             m.cleanup();
117         }
118         memberNodes.clear();
119     }
120
121     @Test
122     public void testBackupDatastore() throws Exception {
123         MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1")
124                 .moduleShardsConfig("module-shards-member1.conf").waitForShardLeader("cars", "people")
125                 .testName("testBackupDatastore").build();
126
127         String fileName = "target/testBackupDatastore";
128         new File(fileName).delete();
129
130         final ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(),
131                 null, null);
132
133         RpcResult<BackupDatastoreOutput> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder()
134                 .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
135         verifySuccessfulRpcResult(rpcResult);
136
137         try (FileInputStream fis = new FileInputStream(fileName)) {
138             List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
139             assertEquals("DatastoreSnapshot size", 2, snapshots.size());
140
141             ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
142                     snapshots.get(1).getType(), snapshots.get(1));
143             verifyDatastoreSnapshot(node.configDataStore().getActorUtils().getDataStoreName(),
144                     map.get(node.configDataStore().getActorUtils().getDataStoreName()), "cars", "people");
145         } finally {
146             new File(fileName).delete();
147         }
148
149         // Test failure by killing a shard.
150
151         node.configDataStore().getActorUtils().getShardManager().tell(node.datastoreContextBuilder()
152                 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
153
154         ActorRef carsShardActor = node.configDataStore().getActorUtils().findLocalShard("cars").get();
155         node.kit().watch(carsShardActor);
156         carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
157         node.kit().expectTerminated(carsShardActor);
158
159         rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build())
160                 .get(5, TimeUnit.SECONDS);
161         assertFalse("isSuccessful", rpcResult.isSuccessful());
162         assertEquals("getErrors", 1, rpcResult.getErrors().size());
163     }
164
165     private static void verifyDatastoreSnapshot(final String type, final DatastoreSnapshot datastoreSnapshot,
166             final String... expShardNames) {
167         assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
168         Set<String> shardNames = new HashSet<>();
169         for (DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
170             shardNames.add(s.getName());
171         }
172
173         assertEquals("DatastoreSnapshot shard names", Set.of(expShardNames), shardNames);
174     }
175
176     @Test
177     public void testGetPrefixShardRole() throws Exception {
178         String name = "testGetPrefixShardRole";
179         String moduleShardsConfig = "module-shards-default-member-1.conf";
180
181         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
182                 .moduleShardsConfig(moduleShardsConfig).build();
183
184         member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), "default");
185     }
186
187     @Test
188     public void testModuleShardLeaderMovement() throws Exception {
189         String name = "testModuleShardLeaderMovement";
190         String moduleShardsConfig = "module-shards-member1.conf";
191
192         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
193                 .waitForShardLeader("cars").moduleShardsConfig(moduleShardsConfig).build();
194         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
195                 .moduleShardsConfig(moduleShardsConfig).build();
196         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
197                 .moduleShardsConfig(moduleShardsConfig).build();
198
199         member1.waitForMembersUp("member-2", "member-3");
200         replicaNode2.waitForMembersUp("member-1");
201         replicaNode3.waitForMembersUp("member-1", "member-2");
202
203         doAddShardReplica(replicaNode2, "cars", "member-1");
204         doAddShardReplica(replicaNode3, "cars", "member-1", "member-2");
205
206         verifyRaftPeersPresent(member1.configDataStore(), "cars", "member-2", "member-3");
207
208         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
209
210         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
211
212         doMakeShardLeaderLocal(member1, "cars", "member-1");
213         verifyRaftState(replicaNode2.configDataStore(), "cars",
214             raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
215         verifyRaftState(replicaNode3.configDataStore(), "cars",
216             raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
217
218         doMakeShardLeaderLocal(replicaNode2, "cars", "member-2");
219         verifyRaftState(member1.configDataStore(), "cars",
220             raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
221         verifyRaftState(replicaNode3.configDataStore(), "cars",
222             raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
223
224         replicaNode2.waitForMembersUp("member-3");
225         doMakeShardLeaderLocal(replicaNode3, "cars", "member-3");
226     }
227
228     @Test
229     public void testAddShardReplica() throws Exception {
230         String name = "testAddShardReplica";
231         String moduleShardsConfig = "module-shards-cars-member-1.conf";
232         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
233                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
234
235         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
236                 .moduleShardsConfig(moduleShardsConfig).build();
237
238         leaderNode1.waitForMembersUp("member-2");
239
240         doAddShardReplica(newReplicaNode2, "cars", "member-1");
241
242         MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
243                 .moduleShardsConfig(moduleShardsConfig).build();
244
245         leaderNode1.waitForMembersUp("member-3");
246         newReplicaNode2.waitForMembersUp("member-3");
247
248         doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
249
250         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
251         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
252
253         // Write data to member-2's config datastore and read/verify via member-3
254         final NormalizedNode configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
255                 newReplicaNode3.configDataStore());
256
257         // Write data to member-3's oper datastore and read/verify via member-2
258         writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
259
260         // Verify all data has been replicated. We expect 4 log entries and thus last applied index of 3 -
261         // 2 ServerConfigurationPayload entries,  the transaction payload entry plus a purge payload.
262
263         RaftStateVerifier verifier = raftState -> {
264             assertEquals("Commit index", 4, raftState.getCommitIndex());
265             assertEquals("Last applied index", 4, raftState.getLastApplied());
266         };
267
268         verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
269         verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
270
271         verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
272         verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
273
274         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
275         verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
276
277         // Restart member-3 and verify the cars config shard is re-instated.
278
279         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
280         newReplicaNode3.cleanup();
281
282         newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
283                 .moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
284
285         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
286         readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
287     }
288
289     @Test
290     public void testAddShardReplicaFailures() throws Exception {
291         String name = "testAddShardReplicaFailures";
292         MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
293                 .moduleShardsConfig("module-shards-cars-member-1.conf").build();
294
295         final ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
296                 memberNode.operDataStore(), null, null);
297
298         RpcResult<AddShardReplicaOutput> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
299                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
300         verifyFailedRpcResult(rpcResult);
301
302         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars")
303                 .build()).get(10, TimeUnit.SECONDS);
304         verifyFailedRpcResult(rpcResult);
305
306         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people")
307                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
308         verifyFailedRpcResult(rpcResult);
309     }
310
311     private static NormalizedNode writeCarsNodeAndVerify(final AbstractDataStore writeToStore,
312             final AbstractDataStore readFromStore) throws Exception {
313         DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
314         NormalizedNode carsNode = CarsModel.create();
315         writeTx.write(CarsModel.BASE_PATH, carsNode);
316
317         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
318         Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
319         assertEquals("canCommit", TRUE, canCommit);
320         cohort.preCommit().get(5, TimeUnit.SECONDS);
321         cohort.commit().get(5, TimeUnit.SECONDS);
322
323         readCarsNodeAndVerify(readFromStore, carsNode);
324         return carsNode;
325     }
326
327     private static void readCarsNodeAndVerify(final AbstractDataStore readFromStore,
328             final NormalizedNode expCarsNode) throws Exception {
329         Optional<NormalizedNode> optional = readFromStore.newReadOnlyTransaction().read(CarsModel.BASE_PATH)
330                 .get(15, TimeUnit.SECONDS);
331         assertTrue("isPresent", optional.isPresent());
332         assertEquals("Data node", expCarsNode, optional.get());
333     }
334
335     private static void doAddShardReplica(final MemberNode memberNode, final String shardName,
336             final String... peerMemberNames) throws Exception {
337         memberNode.waitForMembersUp(peerMemberNames);
338
339         final ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
340                 memberNode.operDataStore(), null, null);
341
342         RpcResult<AddShardReplicaOutput> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
343             .setShardName(shardName).setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
344         verifySuccessfulRpcResult(rpcResult);
345
346         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
347
348         Optional<ActorRef> optional = memberNode.operDataStore().getActorUtils().findLocalShard(shardName);
349         assertFalse("Oper shard present", optional.isPresent());
350
351         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
352                 .setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
353         verifySuccessfulRpcResult(rpcResult);
354
355         verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
356     }
357
358     private static void doMakeShardLeaderLocal(final MemberNode memberNode, final String shardName,
359             final String newLeader) throws Exception {
360         final ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
361                 memberNode.operDataStore(), null, null);
362
363         final RpcResult<MakeLeaderLocalOutput> rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder()
364                 .setDataStoreType(DataStoreType.Config).setShardName(shardName).build())
365                 .get(10, TimeUnit.SECONDS);
366
367         verifySuccessfulRpcResult(rpcResult);
368
369         verifyRaftState(memberNode.configDataStore(), shardName, raftState -> assertThat(raftState.getLeader(),
370                 containsString(newLeader)));
371     }
372
373     private static <T> T verifySuccessfulRpcResult(final RpcResult<T> rpcResult) {
374         if (!rpcResult.isSuccessful()) {
375             if (rpcResult.getErrors().size() > 0) {
376                 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
377                 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
378             }
379
380             fail("Rpc failed with no error");
381         }
382
383         return rpcResult.getResult();
384     }
385
386     private static void verifyFailedRpcResult(final RpcResult<?> rpcResult) {
387         assertFalse("RpcResult", rpcResult.isSuccessful());
388         assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
389         RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
390         assertNotNull("RpcResult error message null", error.getMessage());
391     }
392
393     @Test
394     public void testRemoveShardReplica() throws Exception {
395         String name = "testRemoveShardReplica";
396         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
397         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
398                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
399                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
400                 .build();
401
402         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
403                 .moduleShardsConfig(moduleShardsConfig).build();
404
405         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
406                 .moduleShardsConfig(moduleShardsConfig).build();
407
408         leaderNode1.configDataStore().waitTillReady();
409         replicaNode3.configDataStore().waitTillReady();
410         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
411         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
412         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
413
414         // Invoke RPC service on member-3 to remove it's local shard
415
416         final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
417                 replicaNode3.operDataStore(), null, null);
418
419         RpcResult<RemoveShardReplicaOutput> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder()
420                 .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build())
421                 .get(10, TimeUnit.SECONDS);
422         verifySuccessfulRpcResult(rpcResult);
423
424         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
425         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
426         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
427
428         // Restart member-2 and verify member-3 isn't present.
429
430         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
431         replicaNode2.cleanup();
432
433         MemberNode newPeplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
434                 .moduleShardsConfig(moduleShardsConfig).build();
435
436         newPeplicaNode2.configDataStore().waitTillReady();
437         verifyRaftPeersPresent(newPeplicaNode2.configDataStore(), "cars", "member-1");
438
439         // Invoke RPC service on member-1 to remove member-2
440
441         final ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
442                 leaderNode1.operDataStore(), null, null);
443
444         rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars")
445                 .setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
446         verifySuccessfulRpcResult(rpcResult);
447
448         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
449         verifyNoShardPresent(newPeplicaNode2.configDataStore(), "cars");
450     }
451
452     @Test
453     public void testRemoveShardLeaderReplica() throws Exception {
454         String name = "testRemoveShardLeaderReplica";
455         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
456         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
457                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
458                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
459                 .build();
460
461         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
462                 .moduleShardsConfig(moduleShardsConfig).build();
463
464         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
465                 .moduleShardsConfig(moduleShardsConfig).build();
466
467         leaderNode1.configDataStore().waitTillReady();
468         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
469         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
470         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
471
472         replicaNode2.waitForMembersUp("member-1", "member-3");
473         replicaNode3.waitForMembersUp("member-1", "member-2");
474
475         // Invoke RPC service on leader member-1 to remove it's local shard
476
477         final ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
478                 leaderNode1.operDataStore(), null, null);
479
480         RpcResult<RemoveShardReplicaOutput> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
481                 .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build())
482                 .get(10, TimeUnit.SECONDS);
483         verifySuccessfulRpcResult(rpcResult);
484
485         verifyRaftState(replicaNode2.configDataStore(), "cars", raftState ->
486                 assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
487                         containsString("member-3"))));
488
489         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
490         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
491         verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
492     }
493
494     @Test
495     public void testAddReplicasForAllShards() throws Exception {
496         String name = "testAddReplicasForAllShards";
497         String moduleShardsConfig = "module-shards-member1.conf";
498         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
499                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
500
501         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(
502             XMLNamespace.of("pets-ns"), "pets-module", "pets", null, List.of(MEMBER_1));
503         leaderNode1.configDataStore().getActorUtils().getShardManager().tell(
504                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
505         leaderNode1.kit().expectMsgClass(Success.class);
506         leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorUtils(), "pets");
507
508         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
509                 .moduleShardsConfig(moduleShardsConfig).build();
510
511         leaderNode1.waitForMembersUp("member-2");
512         newReplicaNode2.waitForMembersUp("member-1");
513
514         newReplicaNode2.configDataStore().getActorUtils().getShardManager().tell(
515                 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
516         newReplicaNode2.kit().expectMsgClass(Success.class);
517
518         newReplicaNode2.operDataStore().getActorUtils().getShardManager().tell(
519                 new CreateShard(new ModuleShardConfiguration(XMLNamespace.of("no-leader-ns"), "no-leader-module",
520                                                              "no-leader", null, List.of(MEMBER_1)),
521                                 Shard.builder(), null),
522                                 newReplicaNode2.kit().getRef());
523         newReplicaNode2.kit().expectMsgClass(Success.class);
524
525         final ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
526                 newReplicaNode2.operDataStore(), null, null);
527
528         RpcResult<AddReplicasForAllShardsOutput> rpcResult = service.addReplicasForAllShards(
529             new AddReplicasForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
530         AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
531         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
532                 successShardResult("people", DataStoreType.Config),
533                 successShardResult("pets", DataStoreType.Config),
534                 successShardResult("cars", DataStoreType.Operational),
535                 successShardResult("people", DataStoreType.Operational),
536                 failedShardResult("no-leader", DataStoreType.Operational));
537
538         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
539         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
540         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
541         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
542         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
543     }
544
545     @Test
546     public void testRemoveAllShardReplicas() throws Exception {
547         String name = "testRemoveAllShardReplicas";
548         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
549         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
550                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
551                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
552                 .build();
553
554         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
555                 .moduleShardsConfig(moduleShardsConfig).build();
556
557         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
558                 .moduleShardsConfig(moduleShardsConfig).build();
559
560         leaderNode1.configDataStore().waitTillReady();
561         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
562         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
563         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
564
565         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(XMLNamespace.of("pets-ns"),
566                 "pets-module", "pets", null, List.of(MEMBER_1, MEMBER_2, MEMBER_3));
567         leaderNode1.configDataStore().getActorUtils().getShardManager().tell(
568                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
569         leaderNode1.kit().expectMsgClass(Success.class);
570
571         replicaNode2.configDataStore().getActorUtils().getShardManager().tell(
572                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef());
573         replicaNode2.kit().expectMsgClass(Success.class);
574
575         replicaNode3.configDataStore().getActorUtils().getShardManager().tell(
576                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef());
577         replicaNode3.kit().expectMsgClass(Success.class);
578
579         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3");
580         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
581         verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
582
583         final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
584                 replicaNode3.operDataStore(), null, null);
585
586         RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
587                 new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
588         RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult);
589         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
590                 successShardResult("people", DataStoreType.Config),
591                 successShardResult("pets", DataStoreType.Config),
592                 successShardResult("cars", DataStoreType.Operational),
593                 successShardResult("people", DataStoreType.Operational));
594
595         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
596         verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2");
597         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2");
598         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
599         verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1");
600         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1");
601         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
602         verifyNoShardPresent(replicaNode3.configDataStore(), "people");
603         verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
604     }
605
606     @Test
607     public void testChangeMemberVotingStatesForShard() throws Exception {
608         String name = "testChangeMemberVotingStatusForShard";
609         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
610         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
611                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
612                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
613                 .build();
614
615         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
616                 .moduleShardsConfig(moduleShardsConfig).build();
617
618         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
619                 .moduleShardsConfig(moduleShardsConfig).build();
620
621         leaderNode1.configDataStore().waitTillReady();
622         replicaNode3.configDataStore().waitTillReady();
623         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
624         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
625         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
626
627         // Invoke RPC service on member-3 to change voting status
628
629         final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
630                 replicaNode3.operDataStore(), null, null);
631
632         RpcResult<ChangeMemberVotingStatesForShardOutput> rpcResult = service3
633                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
634                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
635                         .setMemberVotingState(List.of(
636                                 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
637                                 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build()))
638                         .build())
639                 .get(10, TimeUnit.SECONDS);
640         verifySuccessfulRpcResult(rpcResult);
641
642         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
643                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
644         verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
645                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
646         verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
647                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
648     }
649
650     @Test
651     public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
652         String name = "testChangeMemberVotingStatesForSingleNodeShard";
653         String moduleShardsConfig = "module-shards-member1.conf";
654         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
655                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
656                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
657                 .build();
658
659         leaderNode.configDataStore().waitTillReady();
660
661         // Invoke RPC service on member-3 to change voting status
662
663         final ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
664                 leaderNode.operDataStore(), null, null);
665
666         RpcResult<ChangeMemberVotingStatesForShardOutput> rpcResult = service
667                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
668                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
669                         .setMemberVotingState(List.of(new MemberVotingStateBuilder()
670                             .setMemberName("member-1")
671                             .setVoting(FALSE)
672                             .build()))
673                         .build())
674                 .get(10, TimeUnit.SECONDS);
675         verifyFailedRpcResult(rpcResult);
676
677         verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE));
678     }
679
680     @Test
681     public void testChangeMemberVotingStatesForAllShards() throws Exception {
682         String name = "testChangeMemberVotingStatesForAllShards";
683         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
684         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
685                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
686                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
687                 .build();
688
689         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
690                 .moduleShardsConfig(moduleShardsConfig).build();
691
692         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
693                 .moduleShardsConfig(moduleShardsConfig).build();
694
695         leaderNode1.configDataStore().waitTillReady();
696         leaderNode1.operDataStore().waitTillReady();
697         replicaNode3.configDataStore().waitTillReady();
698         replicaNode3.operDataStore().waitTillReady();
699         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
700         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
701         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
702
703         // Invoke RPC service on member-3 to change voting status
704
705         final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
706                 replicaNode3.operDataStore(), null, null);
707
708         RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
709                 new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(List.of(
710                         new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
711                         new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build())).build())
712                 .get(10, TimeUnit.SECONDS);
713         ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
714         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
715                 successShardResult("people", DataStoreType.Config),
716                 successShardResult("cars", DataStoreType.Operational),
717                 successShardResult("people", DataStoreType.Operational));
718
719         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
720                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
721                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
722                 new String[]{"cars", "people"}, new SimpleEntry<>("member-1", TRUE),
723                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
724     }
725
726     @Test
727     public void testFlipMemberVotingStates() throws Exception {
728         String name = "testFlipMemberVotingStates";
729
730         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(List.of(
731                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
732                 new ServerInfo("member-3", false)));
733
734         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
735         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
736         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
737
738         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
739         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
740                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder()
741                         .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10))
742                 .build();
743
744         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
745                 .moduleShardsConfig(moduleShardsConfig).build();
746
747         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
748                 .moduleShardsConfig(moduleShardsConfig).build();
749
750         leaderNode1.configDataStore().waitTillReady();
751         leaderNode1.operDataStore().waitTillReady();
752         replicaNode3.configDataStore().waitTillReady();
753         replicaNode3.operDataStore().waitTillReady();
754         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
755                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE));
756
757         final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
758                 replicaNode3.operDataStore(), null, null);
759
760         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards(
761             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
762         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
763         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
764                 successShardResult("people", DataStoreType.Config),
765                 successShardResult("cars", DataStoreType.Operational),
766                 successShardResult("people", DataStoreType.Operational));
767
768         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
769                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
770                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
771                 new String[]{"cars", "people"},
772                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
773                 new SimpleEntry<>("member-3", TRUE));
774
775         // Leadership should have transferred to member 3 since it is the only remaining voting member.
776         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
777             assertNotNull("Expected non-null leader Id", raftState.getLeader());
778             assertTrue("Expected leader member-3. Actual: " + raftState.getLeader(),
779                     raftState.getLeader().contains("member-3"));
780         });
781
782         verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
783             assertNotNull("Expected non-null leader Id", raftState.getLeader());
784             assertTrue("Expected leader member-3. Actual: " + raftState.getLeader(),
785                     raftState.getLeader().contains("member-3"));
786         });
787
788         // Flip the voting states back to the original states.
789
790         rpcResult = service3.flipMemberVotingStatesForAllShards(
791             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
792         result = verifySuccessfulRpcResult(rpcResult);
793         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
794                 successShardResult("people", DataStoreType.Config),
795                 successShardResult("cars", DataStoreType.Operational),
796                 successShardResult("people", DataStoreType.Operational));
797
798         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
799                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
800                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
801                 new String[]{"cars", "people"},
802                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
803                 new SimpleEntry<>("member-3", FALSE));
804
805         // Leadership should have transferred to member 1 or 2.
806         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
807             assertNotNull("Expected non-null leader Id", raftState.getLeader());
808             assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
809                     raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
810         });
811     }
812
813     @Test
814     public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
815         String name = "testFlipMemberVotingStatesWithNoInitialLeader";
816
817         // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
818         // non-voting and simulated as down by not starting them up.
819         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(List.of(
820                 new ServerInfo("member-1", false), new ServerInfo("member-2", false),
821                 new ServerInfo("member-3", false), new ServerInfo("member-4", true),
822                 new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
823
824         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
825         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
826         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
827
828         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
829         final MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
830                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
831                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
832                 .build();
833
834         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
835                 .moduleShardsConfig(moduleShardsConfig).build();
836
837         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
838                 .moduleShardsConfig(moduleShardsConfig).build();
839
840         // Initially there won't be a leader b/c all the up nodes are non-voting.
841
842         replicaNode1.waitForMembersUp("member-2", "member-3");
843
844         verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", FALSE),
845                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE),
846                 new SimpleEntry<>("member-4", TRUE), new SimpleEntry<>("member-5", TRUE),
847                 new SimpleEntry<>("member-6", TRUE));
848
849         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
850             assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
851
852         final ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
853                 replicaNode1.operDataStore(), null, null);
854
855         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards(
856             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
857         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
858         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
859                 successShardResult("people", DataStoreType.Config),
860                 successShardResult("cars", DataStoreType.Operational),
861                 successShardResult("people", DataStoreType.Operational));
862
863         verifyVotingStates(new AbstractDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
864                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
865                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
866                 new String[]{"cars", "people"},
867                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
868                 new SimpleEntry<>("member-3", TRUE), new SimpleEntry<>("member-4", FALSE),
869                 new SimpleEntry<>("member-5", FALSE), new SimpleEntry<>("member-6", FALSE));
870
871         // Since member 1 was changed to voting and there was no leader, it should've started and election
872         // and become leader
873         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
874             assertNotNull("Expected non-null leader Id", raftState.getLeader());
875             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
876                     raftState.getLeader().contains("member-1"));
877         });
878
879         verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
880             assertNotNull("Expected non-null leader Id", raftState.getLeader());
881             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
882                     raftState.getLeader().contains("member-1"));
883         });
884     }
885
886     @Test
887     public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
888         String name = "testFlipMemberVotingStatesWithVotingMembersDown";
889
890         // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
891         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(List.of(
892                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
893                 new ServerInfo("member-3", true), new ServerInfo("member-4", false),
894                 new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
895
896         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
897         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
898         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
899
900         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
901         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
902                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
903                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
904                 .build();
905
906         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
907                 .moduleShardsConfig(moduleShardsConfig).build();
908
909         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
910                 .moduleShardsConfig(moduleShardsConfig).build();
911
912         leaderNode1.configDataStore().waitTillReady();
913         leaderNode1.operDataStore().waitTillReady();
914         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
915                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", TRUE),
916                 new SimpleEntry<>("member-4", FALSE), new SimpleEntry<>("member-5", FALSE),
917                 new SimpleEntry<>("member-6", FALSE));
918
919         final ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
920                 leaderNode1.operDataStore(), null, null);
921
922         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards(
923             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
924         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
925         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
926                 successShardResult("people", DataStoreType.Config),
927                 successShardResult("cars", DataStoreType.Operational),
928                 successShardResult("people", DataStoreType.Operational));
929
930         // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
931         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
932                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
933                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
934                 new String[]{"cars", "people"},
935                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
936                 new SimpleEntry<>("member-3", FALSE), new SimpleEntry<>("member-4", TRUE),
937                 new SimpleEntry<>("member-5", TRUE), new SimpleEntry<>("member-6", TRUE));
938
939         // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
940         // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
941         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
942             assertNotNull("Expected non-null leader Id", raftState.getLeader());
943             assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
944         });
945     }
946
947     private static void setupPersistedServerConfigPayload(final ServerConfigurationPayload serverConfig,
948             final String member, final String datastoreTypeSuffix, final String... shards) {
949         String[] datastoreTypes = {"config_", "oper_"};
950         for (String type : datastoreTypes) {
951             for (String shard : shards) {
952                 List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
953                 for (ServerInfo info : serverConfig.getServerConfig()) {
954                     newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
955                             type + datastoreTypeSuffix).toString(), info.isVoting()));
956                 }
957
958                 final String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
959                         type + datastoreTypeSuffix).toString();
960                 InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
961                 InMemoryJournal.addEntry(shardID, 2, new SimpleReplicatedLogEntry(0, 1,
962                         new ServerConfigurationPayload(newServerInfo)));
963             }
964         }
965     }
966
967     @SafeVarargs
968     private static void verifyVotingStates(final AbstractDataStore[] datastores, final String[] shards,
969             final SimpleEntry<String, Boolean>... expStates) throws Exception {
970         for (AbstractDataStore datastore: datastores) {
971             for (String shard: shards) {
972                 verifyVotingStates(datastore, shard, expStates);
973             }
974         }
975     }
976
977     @SafeVarargs
978     private static void verifyVotingStates(final AbstractDataStore datastore, final String shardName,
979             final SimpleEntry<String, Boolean>... expStates) throws Exception {
980         String localMemberName = datastore.getActorUtils().getCurrentMemberName().getName();
981         Map<String, Boolean> expStateMap = new HashMap<>();
982         for (Entry<String, Boolean> e: expStates) {
983             expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
984                     datastore.getActorUtils().getDataStoreName()).toString(), e.getValue());
985         }
986
987         verifyRaftState(datastore, shardName, raftState -> {
988             String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
989                     datastore.getActorUtils().getDataStoreName()).toString();
990             assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
991             for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
992                 assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
993             }
994         });
995     }
996
997     private static void verifyShardResults(final Map<ShardResultKey, ShardResult> shardResults,
998             final ShardResult... expShardResults) {
999         Map<String, ShardResult> expResultsMap = new HashMap<>();
1000         for (ShardResult r: expShardResults) {
1001             expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
1002         }
1003
1004         for (ShardResult result: shardResults.values()) {
1005             ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
1006             assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
1007                     result.getDataStoreType()), exp);
1008             assertEquals("isSucceeded", exp.getSucceeded(), result.getSucceeded());
1009             if (exp.getSucceeded()) {
1010                 assertNull("Expected null error message", result.getErrorMessage());
1011             } else {
1012                 assertNotNull("Expected error message", result.getErrorMessage());
1013             }
1014         }
1015
1016         if (!expResultsMap.isEmpty()) {
1017             fail("Missing shard results for " + expResultsMap.keySet());
1018         }
1019     }
1020
1021     private static ShardResult successShardResult(final String shardName, final DataStoreType type) {
1022         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(TRUE).build();
1023     }
1024
1025     private static ShardResult failedShardResult(final String shardName, final DataStoreType type) {
1026         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(FALSE).build();
1027     }
1028 }