1362db752e123014c7a6e832262f7bda4a6f00ac
[controller.git] / opendaylight / md-sal / sal-cluster-admin-impl / src / test / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcServiceTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.admin;
9
10 import static java.lang.Boolean.FALSE;
11 import static java.lang.Boolean.TRUE;
12 import static org.hamcrest.CoreMatchers.anyOf;
13 import static org.hamcrest.CoreMatchers.containsString;
14 import static org.hamcrest.MatcherAssert.assertThat;
15 import static org.junit.Assert.assertEquals;
16 import static org.junit.Assert.assertFalse;
17 import static org.junit.Assert.assertNotNull;
18 import static org.junit.Assert.assertNull;
19 import static org.junit.Assert.assertTrue;
20 import static org.junit.Assert.fail;
21 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
22 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
23 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
24
25 import akka.actor.ActorRef;
26 import akka.actor.PoisonPill;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import com.google.common.collect.ImmutableList;
30 import com.google.common.collect.ImmutableMap;
31 import com.google.common.collect.Iterables;
32 import com.google.common.collect.Lists;
33 import com.google.common.collect.Sets;
34 import java.io.File;
35 import java.io.FileInputStream;
36 import java.net.URI;
37 import java.util.AbstractMap.SimpleEntry;
38 import java.util.ArrayList;
39 import java.util.Arrays;
40 import java.util.Collections;
41 import java.util.HashMap;
42 import java.util.HashSet;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Map.Entry;
46 import java.util.Optional;
47 import java.util.Set;
48 import java.util.concurrent.TimeUnit;
49 import org.apache.commons.lang3.SerializationUtils;
50 import org.junit.After;
51 import org.junit.Before;
52 import org.junit.Test;
53 import org.opendaylight.controller.cluster.access.concepts.MemberName;
54 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
55 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
56 import org.opendaylight.controller.cluster.datastore.MemberNode;
57 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
58 import org.opendaylight.controller.cluster.datastore.Shard;
59 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
60 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
61 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
62 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
63 import org.opendaylight.controller.cluster.raft.RaftState;
64 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
65 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
66 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
67 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
68 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
69 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
70 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
71 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
72 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInputBuilder;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaOutput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreOutput;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutput;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInputBuilder;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutput;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultKey;
96 import org.opendaylight.yangtools.yang.common.RpcError;
97 import org.opendaylight.yangtools.yang.common.RpcResult;
98 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
99
100 /**
101  * Unit tests for ClusterAdminRpcService.
102  *
103  * @author Thomas Pantelis
104  */
105 public class ClusterAdminRpcServiceTest {
106     private static final MemberName MEMBER_1 = MemberName.forName("member-1");
107     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
108     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
109     private final List<MemberNode> memberNodes = new ArrayList<>();
110
111     @Before
112     public void setUp() {
113         InMemoryJournal.clear();
114         InMemorySnapshotStore.clear();
115     }
116
117     @After
118     public void tearDown() {
119         for (MemberNode m : Lists.reverse(memberNodes)) {
120             m.cleanup();
121         }
122         memberNodes.clear();
123     }
124
125     @Test
126     public void testBackupDatastore() throws Exception {
127         MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1")
128                 .moduleShardsConfig("module-shards-member1.conf").waitForShardLeader("cars", "people")
129                 .testName("testBackupDatastore").build();
130
131         String fileName = "target/testBackupDatastore";
132         new File(fileName).delete();
133
134         ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null);
135
136         RpcResult<BackupDatastoreOutput> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder()
137                 .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
138         verifySuccessfulRpcResult(rpcResult);
139
140         try (FileInputStream fis = new FileInputStream(fileName)) {
141             List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
142             assertEquals("DatastoreSnapshot size", 2, snapshots.size());
143
144             ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
145                     snapshots.get(1).getType(), snapshots.get(1));
146             verifyDatastoreSnapshot(node.configDataStore().getActorUtils().getDataStoreName(),
147                     map.get(node.configDataStore().getActorUtils().getDataStoreName()), "cars", "people");
148         } finally {
149             new File(fileName).delete();
150         }
151
152         // Test failure by killing a shard.
153
154         node.configDataStore().getActorUtils().getShardManager().tell(node.datastoreContextBuilder()
155                 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
156
157         ActorRef carsShardActor = node.configDataStore().getActorUtils().findLocalShard("cars").get();
158         node.kit().watch(carsShardActor);
159         carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
160         node.kit().expectTerminated(carsShardActor);
161
162         rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build())
163                 .get(5, TimeUnit.SECONDS);
164         assertFalse("isSuccessful", rpcResult.isSuccessful());
165         assertEquals("getErrors", 1, rpcResult.getErrors().size());
166     }
167
168     private static void verifyDatastoreSnapshot(final String type, final DatastoreSnapshot datastoreSnapshot,
169             final String... expShardNames) {
170         assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
171         Set<String> shardNames = new HashSet<>();
172         for (DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
173             shardNames.add(s.getName());
174         }
175
176         assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
177     }
178
179     @Test
180     public void testGetPrefixShardRole() throws Exception {
181         String name = "testGetPrefixShardRole";
182         String moduleShardsConfig = "module-shards-default-member-1.conf";
183
184         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
185                 .moduleShardsConfig(moduleShardsConfig).build();
186
187         member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), "default");
188     }
189
190     @Test
191     public void testModuleShardLeaderMovement() throws Exception {
192         String name = "testModuleShardLeaderMovement";
193         String moduleShardsConfig = "module-shards-member1.conf";
194
195         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
196                 .waitForShardLeader("cars").moduleShardsConfig(moduleShardsConfig).build();
197         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
198                 .moduleShardsConfig(moduleShardsConfig).build();
199         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
200                 .moduleShardsConfig(moduleShardsConfig).build();
201
202         member1.waitForMembersUp("member-2", "member-3");
203         replicaNode2.waitForMembersUp("member-1");
204         replicaNode3.waitForMembersUp("member-1", "member-2");
205
206         doAddShardReplica(replicaNode2, "cars", "member-1");
207         doAddShardReplica(replicaNode3, "cars", "member-1", "member-2");
208
209         verifyRaftPeersPresent(member1.configDataStore(), "cars", "member-2", "member-3");
210
211         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
212
213         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
214
215         doMakeShardLeaderLocal(member1, "cars", "member-1");
216         verifyRaftState(replicaNode2.configDataStore(), "cars",
217             raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
218         verifyRaftState(replicaNode3.configDataStore(), "cars",
219             raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
220
221         doMakeShardLeaderLocal(replicaNode2, "cars", "member-2");
222         verifyRaftState(member1.configDataStore(), "cars",
223             raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
224         verifyRaftState(replicaNode3.configDataStore(), "cars",
225             raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
226
227         replicaNode2.waitForMembersUp("member-3");
228         doMakeShardLeaderLocal(replicaNode3, "cars", "member-3");
229     }
230
231     @Test
232     public void testAddShardReplica() throws Exception {
233         String name = "testAddShardReplica";
234         String moduleShardsConfig = "module-shards-cars-member-1.conf";
235         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
236                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
237
238         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
239                 .moduleShardsConfig(moduleShardsConfig).build();
240
241         leaderNode1.waitForMembersUp("member-2");
242
243         doAddShardReplica(newReplicaNode2, "cars", "member-1");
244
245         MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
246                 .moduleShardsConfig(moduleShardsConfig).build();
247
248         leaderNode1.waitForMembersUp("member-3");
249         newReplicaNode2.waitForMembersUp("member-3");
250
251         doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
252
253         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
254         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
255
256         // Write data to member-2's config datastore and read/verify via member-3
257         final NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
258                 newReplicaNode3.configDataStore());
259
260         // Write data to member-3's oper datastore and read/verify via member-2
261         writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
262
263         // Verify all data has been replicated. We expect 4 log entries and thus last applied index of 3 -
264         // 2 ServerConfigurationPayload entries,  the transaction payload entry plus a purge payload.
265
266         RaftStateVerifier verifier = raftState -> {
267             assertEquals("Commit index", 4, raftState.getCommitIndex());
268             assertEquals("Last applied index", 4, raftState.getLastApplied());
269         };
270
271         verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
272         verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
273
274         verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
275         verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
276
277         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
278         verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
279
280         // Restart member-3 and verify the cars config shard is re-instated.
281
282         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
283         newReplicaNode3.cleanup();
284
285         newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
286                 .moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
287
288         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
289         readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
290     }
291
292     @Test
293     public void testAddShardReplicaFailures() throws Exception {
294         String name = "testAddShardReplicaFailures";
295         MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
296                 .moduleShardsConfig("module-shards-cars-member-1.conf").build();
297
298         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
299                 memberNode.operDataStore(), null);
300
301         RpcResult<AddShardReplicaOutput> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
302                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
303         verifyFailedRpcResult(rpcResult);
304
305         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars")
306                 .build()).get(10, TimeUnit.SECONDS);
307         verifyFailedRpcResult(rpcResult);
308
309         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people")
310                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
311         verifyFailedRpcResult(rpcResult);
312     }
313
314     private static NormalizedNode<?, ?> writeCarsNodeAndVerify(final AbstractDataStore writeToStore,
315             final AbstractDataStore readFromStore) throws Exception {
316         DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
317         NormalizedNode<?, ?> carsNode = CarsModel.create();
318         writeTx.write(CarsModel.BASE_PATH, carsNode);
319
320         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
321         Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
322         assertEquals("canCommit", TRUE, canCommit);
323         cohort.preCommit().get(5, TimeUnit.SECONDS);
324         cohort.commit().get(5, TimeUnit.SECONDS);
325
326         readCarsNodeAndVerify(readFromStore, carsNode);
327         return carsNode;
328     }
329
330     private static void readCarsNodeAndVerify(final AbstractDataStore readFromStore,
331             final NormalizedNode<?, ?> expCarsNode) throws Exception {
332         Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction().read(CarsModel.BASE_PATH)
333                 .get(15, TimeUnit.SECONDS);
334         assertTrue("isPresent", optional.isPresent());
335         assertEquals("Data node", expCarsNode, optional.get());
336     }
337
338     private static void doAddShardReplica(final MemberNode memberNode, final String shardName,
339             final String... peerMemberNames) throws Exception {
340         memberNode.waitForMembersUp(peerMemberNames);
341
342         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
343                 memberNode.operDataStore(), null);
344
345         RpcResult<AddShardReplicaOutput> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
346             .setShardName(shardName).setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
347         verifySuccessfulRpcResult(rpcResult);
348
349         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
350
351         Optional<ActorRef> optional = memberNode.operDataStore().getActorUtils().findLocalShard(shardName);
352         assertFalse("Oper shard present", optional.isPresent());
353
354         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
355                 .setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
356         verifySuccessfulRpcResult(rpcResult);
357
358         verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
359     }
360
361     private static void doMakeShardLeaderLocal(final MemberNode memberNode, final String shardName,
362             final String newLeader) throws Exception {
363         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
364                 memberNode.operDataStore(), null);
365
366         final RpcResult<MakeLeaderLocalOutput> rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder()
367                 .setDataStoreType(DataStoreType.Config).setShardName(shardName).build())
368                 .get(10, TimeUnit.SECONDS);
369
370         verifySuccessfulRpcResult(rpcResult);
371
372         verifyRaftState(memberNode.configDataStore(), shardName, raftState -> assertThat(raftState.getLeader(),
373                 containsString(newLeader)));
374     }
375
376     private static <T> T verifySuccessfulRpcResult(final RpcResult<T> rpcResult) {
377         if (!rpcResult.isSuccessful()) {
378             if (rpcResult.getErrors().size() > 0) {
379                 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
380                 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
381             }
382
383             fail("Rpc failed with no error");
384         }
385
386         return rpcResult.getResult();
387     }
388
389     private static void verifyFailedRpcResult(final RpcResult<?> rpcResult) {
390         assertFalse("RpcResult", rpcResult.isSuccessful());
391         assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
392         RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
393         assertNotNull("RpcResult error message null", error.getMessage());
394     }
395
396     @Test
397     public void testRemoveShardReplica() throws Exception {
398         String name = "testRemoveShardReplica";
399         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
400         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
401                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
402                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
403                 .build();
404
405         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
406                 .moduleShardsConfig(moduleShardsConfig).build();
407
408         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
409                 .moduleShardsConfig(moduleShardsConfig).build();
410
411         leaderNode1.configDataStore().waitTillReady();
412         replicaNode3.configDataStore().waitTillReady();
413         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
414         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
415         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
416
417         // Invoke RPC service on member-3 to remove it's local shard
418
419         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
420                 replicaNode3.operDataStore(), null);
421
422         RpcResult<RemoveShardReplicaOutput> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder()
423                 .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build())
424                 .get(10, TimeUnit.SECONDS);
425         verifySuccessfulRpcResult(rpcResult);
426
427         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
428         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
429         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
430
431         // Restart member-2 and verify member-3 isn't present.
432
433         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
434         replicaNode2.cleanup();
435
436         MemberNode newPeplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
437                 .moduleShardsConfig(moduleShardsConfig).build();
438
439         newPeplicaNode2.configDataStore().waitTillReady();
440         verifyRaftPeersPresent(newPeplicaNode2.configDataStore(), "cars", "member-1");
441
442         // Invoke RPC service on member-1 to remove member-2
443
444         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
445                 leaderNode1.operDataStore(), null);
446
447         rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars")
448                 .setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
449         verifySuccessfulRpcResult(rpcResult);
450
451         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
452         verifyNoShardPresent(newPeplicaNode2.configDataStore(), "cars");
453     }
454
455     @Test
456     public void testRemoveShardLeaderReplica() throws Exception {
457         String name = "testRemoveShardLeaderReplica";
458         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
459         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
460                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
461                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
462                 .build();
463
464         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
465                 .moduleShardsConfig(moduleShardsConfig).build();
466
467         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
468                 .moduleShardsConfig(moduleShardsConfig).build();
469
470         leaderNode1.configDataStore().waitTillReady();
471         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
472         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
473         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
474
475         replicaNode2.waitForMembersUp("member-1", "member-3");
476         replicaNode3.waitForMembersUp("member-1", "member-2");
477
478         // Invoke RPC service on leader member-1 to remove it's local shard
479
480         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
481                 leaderNode1.operDataStore(), null);
482
483         RpcResult<RemoveShardReplicaOutput> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
484                 .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build())
485                 .get(10, TimeUnit.SECONDS);
486         verifySuccessfulRpcResult(rpcResult);
487
488         verifyRaftState(replicaNode2.configDataStore(), "cars", raftState ->
489                 assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
490                         containsString("member-3"))));
491
492         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
493         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
494         verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
495     }
496
497     @Test
498     public void testAddReplicasForAllShards() throws Exception {
499         String name = "testAddReplicasForAllShards";
500         String moduleShardsConfig = "module-shards-member1.conf";
501         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
502                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
503
504         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
505                                                                                  "pets", null,
506                                                                                  Collections.singletonList(MEMBER_1));
507         leaderNode1.configDataStore().getActorUtils().getShardManager().tell(
508                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
509         leaderNode1.kit().expectMsgClass(Success.class);
510         leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorUtils(), "pets");
511
512         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
513                 .moduleShardsConfig(moduleShardsConfig).build();
514
515         leaderNode1.waitForMembersUp("member-2");
516         newReplicaNode2.waitForMembersUp("member-1");
517
518         newReplicaNode2.configDataStore().getActorUtils().getShardManager().tell(
519                 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
520         newReplicaNode2.kit().expectMsgClass(Success.class);
521
522         newReplicaNode2.operDataStore().getActorUtils().getShardManager().tell(
523                 new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
524                                                              "no-leader", null,
525                                                              Collections.singletonList(MEMBER_1)),
526                                 Shard.builder(), null),
527                                 newReplicaNode2.kit().getRef());
528         newReplicaNode2.kit().expectMsgClass(Success.class);
529
530         ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
531                 newReplicaNode2.operDataStore(), null);
532
533         RpcResult<AddReplicasForAllShardsOutput> rpcResult = service.addReplicasForAllShards(
534             new AddReplicasForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
535         AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
536         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
537                 successShardResult("people", DataStoreType.Config),
538                 successShardResult("pets", DataStoreType.Config),
539                 successShardResult("cars", DataStoreType.Operational),
540                 successShardResult("people", DataStoreType.Operational),
541                 failedShardResult("no-leader", DataStoreType.Operational));
542
543         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
544         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
545         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
546         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
547         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
548     }
549
550     @Test
551     public void testRemoveAllShardReplicas() throws Exception {
552         String name = "testRemoveAllShardReplicas";
553         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
554         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
555                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
556                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
557                 .build();
558
559         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
560                 .moduleShardsConfig(moduleShardsConfig).build();
561
562         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
563                 .moduleShardsConfig(moduleShardsConfig).build();
564
565         leaderNode1.configDataStore().waitTillReady();
566         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
567         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
568         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
569
570         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
571                 "pets", null, Arrays.asList(MEMBER_1, MEMBER_2, MEMBER_3));
572         leaderNode1.configDataStore().getActorUtils().getShardManager().tell(
573                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
574         leaderNode1.kit().expectMsgClass(Success.class);
575
576         replicaNode2.configDataStore().getActorUtils().getShardManager().tell(
577                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef());
578         replicaNode2.kit().expectMsgClass(Success.class);
579
580         replicaNode3.configDataStore().getActorUtils().getShardManager().tell(
581                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef());
582         replicaNode3.kit().expectMsgClass(Success.class);
583
584         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3");
585         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
586         verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
587
588         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
589                 replicaNode3.operDataStore(), null);
590
591         RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
592                 new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
593         RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult);
594         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
595                 successShardResult("people", DataStoreType.Config),
596                 successShardResult("pets", DataStoreType.Config),
597                 successShardResult("cars", DataStoreType.Operational),
598                 successShardResult("people", DataStoreType.Operational));
599
600         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
601         verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2");
602         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2");
603         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
604         verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1");
605         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1");
606         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
607         verifyNoShardPresent(replicaNode3.configDataStore(), "people");
608         verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
609     }
610
611     @Test
612     public void testChangeMemberVotingStatesForShard() throws Exception {
613         String name = "testChangeMemberVotingStatusForShard";
614         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
615         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
616                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
617                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
618                 .build();
619
620         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
621                 .moduleShardsConfig(moduleShardsConfig).build();
622
623         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
624                 .moduleShardsConfig(moduleShardsConfig).build();
625
626         leaderNode1.configDataStore().waitTillReady();
627         replicaNode3.configDataStore().waitTillReady();
628         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
629         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
630         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
631
632         // Invoke RPC service on member-3 to change voting status
633
634         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
635                 replicaNode3.operDataStore(), null);
636
637         RpcResult<ChangeMemberVotingStatesForShardOutput> rpcResult = service3
638                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
639                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
640                         .setMemberVotingState(ImmutableList.of(
641                                 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
642                                 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build()))
643                         .build())
644                 .get(10, TimeUnit.SECONDS);
645         verifySuccessfulRpcResult(rpcResult);
646
647         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
648                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
649         verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
650                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
651         verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
652                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
653     }
654
655     @Test
656     public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
657         String name = "testChangeMemberVotingStatesForSingleNodeShard";
658         String moduleShardsConfig = "module-shards-member1.conf";
659         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
660                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
661                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
662                 .build();
663
664         leaderNode.configDataStore().waitTillReady();
665
666         // Invoke RPC service on member-3 to change voting status
667
668         ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
669                 leaderNode.operDataStore(), null);
670
671         RpcResult<ChangeMemberVotingStatesForShardOutput> rpcResult = service
672                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
673                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
674                         .setMemberVotingState(ImmutableList
675                                 .of(new MemberVotingStateBuilder().setMemberName("member-1").setVoting(FALSE).build()))
676                         .build())
677                 .get(10, TimeUnit.SECONDS);
678         verifyFailedRpcResult(rpcResult);
679
680         verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE));
681     }
682
683     @Test
684     public void testChangeMemberVotingStatesForAllShards() throws Exception {
685         String name = "testChangeMemberVotingStatesForAllShards";
686         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
687         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
688                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
689                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
690                 .build();
691
692         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
693                 .moduleShardsConfig(moduleShardsConfig).build();
694
695         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
696                 .moduleShardsConfig(moduleShardsConfig).build();
697
698         leaderNode1.configDataStore().waitTillReady();
699         leaderNode1.operDataStore().waitTillReady();
700         replicaNode3.configDataStore().waitTillReady();
701         replicaNode3.operDataStore().waitTillReady();
702         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
703         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
704         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
705
706         // Invoke RPC service on member-3 to change voting status
707
708         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
709                 replicaNode3.operDataStore(), null);
710
711         RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
712                 new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
713                         new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
714                         new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build())).build())
715                 .get(10, TimeUnit.SECONDS);
716         ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
717         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
718                 successShardResult("people", DataStoreType.Config),
719                 successShardResult("cars", DataStoreType.Operational),
720                 successShardResult("people", DataStoreType.Operational));
721
722         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
723                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
724                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
725                 new String[]{"cars", "people"}, new SimpleEntry<>("member-1", TRUE),
726                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
727     }
728
729     @Test
730     public void testFlipMemberVotingStates() throws Exception {
731         String name = "testFlipMemberVotingStates";
732
733         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
734                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
735                 new ServerInfo("member-3", false)));
736
737         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
738         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
739         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
740
741         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
742         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
743                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder()
744                         .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10))
745                 .build();
746
747         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
748                 .moduleShardsConfig(moduleShardsConfig).build();
749
750         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
751                 .moduleShardsConfig(moduleShardsConfig).build();
752
753         leaderNode1.configDataStore().waitTillReady();
754         leaderNode1.operDataStore().waitTillReady();
755         replicaNode3.configDataStore().waitTillReady();
756         replicaNode3.operDataStore().waitTillReady();
757         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
758                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE));
759
760         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
761                 replicaNode3.operDataStore(), null);
762
763         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards(
764             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
765         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
766         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
767                 successShardResult("people", DataStoreType.Config),
768                 successShardResult("cars", DataStoreType.Operational),
769                 successShardResult("people", DataStoreType.Operational));
770
771         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
772                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
773                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
774                 new String[]{"cars", "people"},
775                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
776                 new SimpleEntry<>("member-3", TRUE));
777
778         // Leadership should have transferred to member 3 since it is the only remaining voting member.
779         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
780             assertNotNull("Expected non-null leader Id", raftState.getLeader());
781             assertTrue("Expected leader member-3. Actual: " + raftState.getLeader(),
782                     raftState.getLeader().contains("member-3"));
783         });
784
785         verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
786             assertNotNull("Expected non-null leader Id", raftState.getLeader());
787             assertTrue("Expected leader member-3. Actual: " + raftState.getLeader(),
788                     raftState.getLeader().contains("member-3"));
789         });
790
791         // Flip the voting states back to the original states.
792
793         rpcResult = service3.flipMemberVotingStatesForAllShards(
794             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
795         result = verifySuccessfulRpcResult(rpcResult);
796         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
797                 successShardResult("people", DataStoreType.Config),
798                 successShardResult("cars", DataStoreType.Operational),
799                 successShardResult("people", DataStoreType.Operational));
800
801         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
802                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
803                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
804                 new String[]{"cars", "people"},
805                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
806                 new SimpleEntry<>("member-3", FALSE));
807
808         // Leadership should have transferred to member 1 or 2.
809         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
810             assertNotNull("Expected non-null leader Id", raftState.getLeader());
811             assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
812                     raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
813         });
814     }
815
816     @Test
817     public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
818         String name = "testFlipMemberVotingStatesWithNoInitialLeader";
819
820         // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
821         // non-voting and simulated as down by not starting them up.
822         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
823                 new ServerInfo("member-1", false), new ServerInfo("member-2", false),
824                 new ServerInfo("member-3", false), new ServerInfo("member-4", true),
825                 new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
826
827         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
828         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
829         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
830
831         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
832         final MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
833                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
834                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
835                 .build();
836
837         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
838                 .moduleShardsConfig(moduleShardsConfig).build();
839
840         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
841                 .moduleShardsConfig(moduleShardsConfig).build();
842
843         // Initially there won't be a leader b/c all the up nodes are non-voting.
844
845         replicaNode1.waitForMembersUp("member-2", "member-3");
846
847         verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", FALSE),
848                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE),
849                 new SimpleEntry<>("member-4", TRUE), new SimpleEntry<>("member-5", TRUE),
850                 new SimpleEntry<>("member-6", TRUE));
851
852         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
853             assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
854
855         ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
856                 replicaNode1.operDataStore(), null);
857
858         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards(
859             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
860         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
861         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
862                 successShardResult("people", DataStoreType.Config),
863                 successShardResult("cars", DataStoreType.Operational),
864                 successShardResult("people", DataStoreType.Operational));
865
866         verifyVotingStates(new AbstractDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
867                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
868                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
869                 new String[]{"cars", "people"},
870                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
871                 new SimpleEntry<>("member-3", TRUE), new SimpleEntry<>("member-4", FALSE),
872                 new SimpleEntry<>("member-5", FALSE), new SimpleEntry<>("member-6", FALSE));
873
874         // Since member 1 was changed to voting and there was no leader, it should've started and election
875         // and become leader
876         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
877             assertNotNull("Expected non-null leader Id", raftState.getLeader());
878             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
879                     raftState.getLeader().contains("member-1"));
880         });
881
882         verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
883             assertNotNull("Expected non-null leader Id", raftState.getLeader());
884             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
885                     raftState.getLeader().contains("member-1"));
886         });
887     }
888
889     @Test
890     public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
891         String name = "testFlipMemberVotingStatesWithVotingMembersDown";
892
893         // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
894         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
895                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
896                 new ServerInfo("member-3", true), new ServerInfo("member-4", false),
897                 new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
898
899         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
900         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
901         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
902
903         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
904         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
905                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
906                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
907                 .build();
908
909         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
910                 .moduleShardsConfig(moduleShardsConfig).build();
911
912         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
913                 .moduleShardsConfig(moduleShardsConfig).build();
914
915         leaderNode1.configDataStore().waitTillReady();
916         leaderNode1.operDataStore().waitTillReady();
917         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
918                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", TRUE),
919                 new SimpleEntry<>("member-4", FALSE), new SimpleEntry<>("member-5", FALSE),
920                 new SimpleEntry<>("member-6", FALSE));
921
922         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
923                 leaderNode1.operDataStore(), null);
924
925         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards(
926             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
927         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
928         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
929                 successShardResult("people", DataStoreType.Config),
930                 successShardResult("cars", DataStoreType.Operational),
931                 successShardResult("people", DataStoreType.Operational));
932
933         // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
934         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
935                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
936                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
937                 new String[]{"cars", "people"},
938                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
939                 new SimpleEntry<>("member-3", FALSE), new SimpleEntry<>("member-4", TRUE),
940                 new SimpleEntry<>("member-5", TRUE), new SimpleEntry<>("member-6", TRUE));
941
942         // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
943         // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
944         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
945             assertNotNull("Expected non-null leader Id", raftState.getLeader());
946             assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
947         });
948     }
949
950     private static void setupPersistedServerConfigPayload(final ServerConfigurationPayload serverConfig,
951             final String member, final String datastoreTypeSuffix, final String... shards) {
952         String[] datastoreTypes = {"config_", "oper_"};
953         for (String type : datastoreTypes) {
954             for (String shard : shards) {
955                 List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
956                 for (ServerInfo info : serverConfig.getServerConfig()) {
957                     newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
958                             type + datastoreTypeSuffix).toString(), info.isVoting()));
959                 }
960
961                 String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
962                         type + datastoreTypeSuffix).toString();
963                 InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
964                 InMemoryJournal.addEntry(shardID, 2, new SimpleReplicatedLogEntry(0, 1,
965                         new ServerConfigurationPayload(newServerInfo)));
966             }
967         }
968     }
969
970     @SafeVarargs
971     private static void verifyVotingStates(final AbstractDataStore[] datastores, final String[] shards,
972             final SimpleEntry<String, Boolean>... expStates) throws Exception {
973         for (AbstractDataStore datastore: datastores) {
974             for (String shard: shards) {
975                 verifyVotingStates(datastore, shard, expStates);
976             }
977         }
978     }
979
980     @SafeVarargs
981     private static void verifyVotingStates(final AbstractDataStore datastore, final String shardName,
982             final SimpleEntry<String, Boolean>... expStates) throws Exception {
983         String localMemberName = datastore.getActorUtils().getCurrentMemberName().getName();
984         Map<String, Boolean> expStateMap = new HashMap<>();
985         for (Entry<String, Boolean> e: expStates) {
986             expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
987                     datastore.getActorUtils().getDataStoreName()).toString(), e.getValue());
988         }
989
990         verifyRaftState(datastore, shardName, raftState -> {
991             String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
992                     datastore.getActorUtils().getDataStoreName()).toString();
993             assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
994             for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
995                 assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
996             }
997         });
998     }
999
1000     private static void verifyShardResults(final Map<ShardResultKey, ShardResult> shardResults,
1001             final ShardResult... expShardResults) {
1002         Map<String, ShardResult> expResultsMap = new HashMap<>();
1003         for (ShardResult r: expShardResults) {
1004             expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
1005         }
1006
1007         for (ShardResult result: shardResults.values()) {
1008             ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
1009             assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
1010                     result.getDataStoreType()), exp);
1011             assertEquals("isSucceeded", exp.getSucceeded(), result.getSucceeded());
1012             if (exp.getSucceeded()) {
1013                 assertNull("Expected null error message", result.getErrorMessage());
1014             } else {
1015                 assertNotNull("Expected error message", result.getErrorMessage());
1016             }
1017         }
1018
1019         if (!expResultsMap.isEmpty()) {
1020             fail("Missing shard results for " + expResultsMap.keySet());
1021         }
1022     }
1023
1024     private static ShardResult successShardResult(final String shardName, final DataStoreType type) {
1025         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(TRUE).build();
1026     }
1027
1028     private static ShardResult failedShardResult(final String shardName, final DataStoreType type) {
1029         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(FALSE).build();
1030     }
1031 }