Fixup eclipse warnings in ClusterAdminRpcServiceTest
[controller.git] / opendaylight / md-sal / sal-cluster-admin-impl / src / test / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcServiceTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.admin;
9
10 import static java.lang.Boolean.FALSE;
11 import static java.lang.Boolean.TRUE;
12 import static org.hamcrest.CoreMatchers.anyOf;
13 import static org.hamcrest.CoreMatchers.containsString;
14 import static org.junit.Assert.assertEquals;
15 import static org.junit.Assert.assertFalse;
16 import static org.junit.Assert.assertNotNull;
17 import static org.junit.Assert.assertNull;
18 import static org.junit.Assert.assertThat;
19 import static org.junit.Assert.assertTrue;
20 import static org.junit.Assert.fail;
21 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
22 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
23 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
24
25 import akka.actor.ActorRef;
26 import akka.actor.PoisonPill;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import com.google.common.base.Optional;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.collect.Iterables;
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Sets;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.net.URI;
38 import java.util.AbstractMap.SimpleEntry;
39 import java.util.ArrayList;
40 import java.util.Arrays;
41 import java.util.HashMap;
42 import java.util.HashSet;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Map.Entry;
46 import java.util.Set;
47 import java.util.concurrent.TimeUnit;
48 import org.apache.commons.lang3.SerializationUtils;
49 import org.junit.After;
50 import org.junit.Before;
51 import org.junit.Test;
52 import org.opendaylight.controller.cluster.access.concepts.MemberName;
53 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
54 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
55 import org.opendaylight.controller.cluster.datastore.MemberNode;
56 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
57 import org.opendaylight.controller.cluster.datastore.Shard;
58 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
59 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
60 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
61 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
62 import org.opendaylight.controller.cluster.raft.RaftState;
63 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
64 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
65 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
66 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
67 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
68 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
69 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
70 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
71 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
86 import org.opendaylight.yangtools.yang.common.RpcError;
87 import org.opendaylight.yangtools.yang.common.RpcResult;
88 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
89
90 /**
91  * Unit tests for ClusterAdminRpcService.
92  *
93  * @author Thomas Pantelis
94  */
95 public class ClusterAdminRpcServiceTest {
96     private static final MemberName MEMBER_1 = MemberName.forName("member-1");
97     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
98     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
99     private final List<MemberNode> memberNodes = new ArrayList<>();
100
101     @Before
102     public void setUp() {
103         InMemoryJournal.clear();
104         InMemorySnapshotStore.clear();
105     }
106
107     @After
108     public void tearDown() {
109         for (MemberNode m : Lists.reverse(memberNodes)) {
110             m.cleanup();
111         }
112         memberNodes.clear();
113     }
114
115     @Test
116     public void testBackupDatastore() throws Exception {
117         MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1")
118                 .moduleShardsConfig("module-shards-member1.conf").waitForShardLeader("cars", "people")
119                 .testName("testBackupDatastore").build();
120
121         String fileName = "target/testBackupDatastore";
122         new File(fileName).delete();
123
124         ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore());
125
126         RpcResult<Void> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder()
127                 .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
128         verifySuccessfulRpcResult(rpcResult);
129
130         try (FileInputStream fis = new FileInputStream(fileName)) {
131             List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
132             assertEquals("DatastoreSnapshot size", 2, snapshots.size());
133
134             ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
135                     snapshots.get(1).getType(), snapshots.get(1));
136             verifyDatastoreSnapshot(node.configDataStore().getActorContext().getDataStoreName(),
137                     map.get(node.configDataStore().getActorContext().getDataStoreName()), "cars", "people");
138         } finally {
139             new File(fileName).delete();
140         }
141
142         // Test failure by killing a shard.
143
144         node.configDataStore().getActorContext().getShardManager().tell(node.datastoreContextBuilder()
145                 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
146
147         ActorRef carsShardActor = node.configDataStore().getActorContext().findLocalShard("cars").get();
148         node.kit().watch(carsShardActor);
149         carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
150         node.kit().expectTerminated(carsShardActor);
151
152         rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build())
153                 .get(5, TimeUnit.SECONDS);
154         assertFalse("isSuccessful", rpcResult.isSuccessful());
155         assertEquals("getErrors", 1, rpcResult.getErrors().size());
156     }
157
158     private static void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot,
159             String... expShardNames) {
160         assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
161         Set<String> shardNames = new HashSet<>();
162         for (DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
163             shardNames.add(s.getName());
164         }
165
166         assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
167     }
168
169     @Test
170     public void testAddShardReplica() throws Exception {
171         String name = "testAddShardReplica";
172         String moduleShardsConfig = "module-shards-cars-member-1.conf";
173         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name )
174                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
175
176         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
177                 .moduleShardsConfig(moduleShardsConfig).build();
178
179         leaderNode1.waitForMembersUp("member-2");
180
181         doAddShardReplica(newReplicaNode2, "cars", "member-1");
182
183         MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
184                 .moduleShardsConfig(moduleShardsConfig).build();
185
186         leaderNode1.waitForMembersUp("member-3");
187         newReplicaNode2.waitForMembersUp("member-3");
188
189         doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
190
191         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
192         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
193
194         // Write data to member-2's config datastore and read/verify via member-3
195         final NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
196                 newReplicaNode3.configDataStore());
197
198         // Write data to member-3's oper datastore and read/verify via member-2
199         writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
200
201         // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 -
202         // 2 ServerConfigurationPayload entries and the transaction payload entry.
203
204         RaftStateVerifier verifier = raftState -> {
205             assertEquals("Commit index", 2, raftState.getCommitIndex());
206             assertEquals("Last applied index", 2, raftState.getLastApplied());
207         };
208
209         verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
210         verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
211
212         verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
213         verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
214
215         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
216         verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
217
218         // Restart member-3 and verify the cars config shard is re-instated.
219
220         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
221         newReplicaNode3.cleanup();
222
223         newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
224                 .moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
225
226         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
227         readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
228     }
229
230     @Test
231     public void testAddShardReplicaFailures() throws Exception {
232         String name = "testAddShardReplicaFailures";
233         MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
234                 .moduleShardsConfig("module-shards-cars-member-1.conf").build();
235
236         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
237                 memberNode.operDataStore());
238
239         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
240                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
241         verifyFailedRpcResult(rpcResult);
242
243         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars")
244                 .build()).get(10, TimeUnit.SECONDS);
245         verifyFailedRpcResult(rpcResult);
246
247         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people")
248                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
249         verifyFailedRpcResult(rpcResult);
250     }
251
252     private static NormalizedNode<?, ?> writeCarsNodeAndVerify(AbstractDataStore writeToStore,
253             AbstractDataStore readFromStore) throws Exception {
254         DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
255         NormalizedNode<?, ?> carsNode = CarsModel.create();
256         writeTx.write(CarsModel.BASE_PATH, carsNode);
257
258         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
259         Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
260         assertEquals("canCommit", TRUE, canCommit);
261         cohort.preCommit().get(5, TimeUnit.SECONDS);
262         cohort.commit().get(5, TimeUnit.SECONDS);
263
264         readCarsNodeAndVerify(readFromStore, carsNode);
265         return carsNode;
266     }
267
268     private static void readCarsNodeAndVerify(AbstractDataStore readFromStore,
269             NormalizedNode<?, ?> expCarsNode) throws Exception {
270         Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction()
271                 .read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
272         assertTrue("isPresent", optional.isPresent());
273         assertEquals("Data node", expCarsNode, optional.get());
274     }
275
276     private static void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames)
277             throws Exception {
278         memberNode.waitForMembersUp(peerMemberNames);
279
280         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
281                 memberNode.operDataStore());
282
283         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
284                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
285         verifySuccessfulRpcResult(rpcResult);
286
287         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
288
289         Optional<ActorRef> optional = memberNode.operDataStore().getActorContext().findLocalShard(shardName);
290         assertFalse("Oper shard present", optional.isPresent());
291
292         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
293                 .setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
294         verifySuccessfulRpcResult(rpcResult);
295
296         verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
297     }
298
299     private static <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
300         if (!rpcResult.isSuccessful()) {
301             if (rpcResult.getErrors().size() > 0) {
302                 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
303                 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
304             }
305
306             fail("Rpc failed with no error");
307         }
308
309         return rpcResult.getResult();
310     }
311
312     private static void verifyFailedRpcResult(RpcResult<Void> rpcResult) {
313         assertFalse("RpcResult", rpcResult.isSuccessful());
314         assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
315         RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
316         assertNotNull("RpcResult error message null", error.getMessage());
317     }
318
319     @Test
320     public void testRemoveShardReplica() throws Exception {
321         String name = "testRemoveShardReplica";
322         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
323         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
324                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
325                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
326                 .build();
327
328         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
329                 .moduleShardsConfig(moduleShardsConfig).build();
330
331         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
332                 .moduleShardsConfig(moduleShardsConfig).build();
333
334         leaderNode1.configDataStore().waitTillReady();
335         replicaNode3.configDataStore().waitTillReady();
336         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
337         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
338         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
339
340         // Invoke RPC service on member-3 to remove it's local shard
341
342         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
343                 replicaNode3.operDataStore());
344
345         RpcResult<Void> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder()
346                 .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build())
347                 .get(10, TimeUnit.SECONDS);
348         verifySuccessfulRpcResult(rpcResult);
349
350         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
351         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
352         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
353
354         // Restart member-2 and verify member-3 isn't present.
355
356         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
357         replicaNode2.cleanup();
358
359         MemberNode newPeplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
360                 .moduleShardsConfig(moduleShardsConfig).build();
361
362         newPeplicaNode2.configDataStore().waitTillReady();
363         verifyRaftPeersPresent(newPeplicaNode2.configDataStore(), "cars", "member-1");
364
365         // Invoke RPC service on member-1 to remove member-2
366
367         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
368                 leaderNode1.operDataStore());
369
370         rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars")
371                 .setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
372         verifySuccessfulRpcResult(rpcResult);
373
374         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
375         verifyNoShardPresent(newPeplicaNode2.configDataStore(), "cars");
376     }
377
378     @Test
379     public void testRemoveShardLeaderReplica() throws Exception {
380         String name = "testRemoveShardLeaderReplica";
381         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
382         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
383                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
384                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
385                 .build();
386
387         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
388                 .moduleShardsConfig(moduleShardsConfig).build();
389
390         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
391                 .moduleShardsConfig(moduleShardsConfig).build();
392
393         leaderNode1.configDataStore().waitTillReady();
394         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
395         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
396         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
397
398         replicaNode2.waitForMembersUp("member-1", "member-3");
399         replicaNode3.waitForMembersUp("member-1", "member-2");
400
401         // Invoke RPC service on leader member-1 to remove it's local shard
402
403         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
404                 leaderNode1.operDataStore());
405
406         RpcResult<Void> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
407                 .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build())
408                 .get(10, TimeUnit.SECONDS);
409         verifySuccessfulRpcResult(rpcResult);
410
411         verifyRaftState(replicaNode2.configDataStore(), "cars", raftState ->
412                 assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
413                         containsString("member-3"))));
414
415         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
416         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
417         verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
418     }
419
420     @Test
421     public void testAddReplicasForAllShards() throws Exception {
422         String name = "testAddReplicasForAllShards";
423         String moduleShardsConfig = "module-shards-member1.conf";
424         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name )
425                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
426
427         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
428                 "pets", null, Arrays.asList(MEMBER_1));
429         leaderNode1.configDataStore().getActorContext().getShardManager().tell(
430                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
431         leaderNode1.kit().expectMsgClass(Success.class);
432         leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorContext(), "pets");
433
434         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
435                 .moduleShardsConfig(moduleShardsConfig).build();
436
437         leaderNode1.waitForMembersUp("member-2");
438         newReplicaNode2.waitForMembersUp("member-1");
439
440         newReplicaNode2.configDataStore().getActorContext().getShardManager().tell(
441                 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
442         newReplicaNode2.kit().expectMsgClass(Success.class);
443
444         newReplicaNode2.operDataStore().getActorContext().getShardManager().tell(
445                 new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
446                         "no-leader", null, Arrays.asList(MEMBER_1)), Shard.builder(), null),
447                                 newReplicaNode2.kit().getRef());
448         newReplicaNode2.kit().expectMsgClass(Success.class);
449
450         ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
451                 newReplicaNode2.operDataStore());
452
453         RpcResult<AddReplicasForAllShardsOutput> rpcResult =
454                 service.addReplicasForAllShards().get(10, TimeUnit.SECONDS);
455         AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
456         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
457                 successShardResult("people", DataStoreType.Config),
458                 successShardResult("pets", DataStoreType.Config),
459                 successShardResult("cars", DataStoreType.Operational),
460                 successShardResult("people", DataStoreType.Operational),
461                 failedShardResult("no-leader", DataStoreType.Operational));
462
463         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
464         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
465         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
466         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
467         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
468     }
469
470     @Test
471     public void testRemoveAllShardReplicas() throws Exception {
472         String name = "testRemoveAllShardReplicas";
473         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
474         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
475                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
476                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
477                 .build();
478
479         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
480                 .moduleShardsConfig(moduleShardsConfig).build();
481
482         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
483                 .moduleShardsConfig(moduleShardsConfig).build();
484
485         leaderNode1.configDataStore().waitTillReady();
486         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
487         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
488         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
489
490         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
491                 "pets", null, Arrays.asList(MEMBER_1, MEMBER_2, MEMBER_3));
492         leaderNode1.configDataStore().getActorContext().getShardManager().tell(
493                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
494         leaderNode1.kit().expectMsgClass(Success.class);
495
496         replicaNode2.configDataStore().getActorContext().getShardManager().tell(
497                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef());
498         replicaNode2.kit().expectMsgClass(Success.class);
499
500         replicaNode3.configDataStore().getActorContext().getShardManager().tell(
501                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef());
502         replicaNode3.kit().expectMsgClass(Success.class);
503
504         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3");
505         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
506         verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
507
508         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
509                 replicaNode3.operDataStore());
510
511         RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
512                 new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
513         RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult);
514         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
515                 successShardResult("people", DataStoreType.Config),
516                 successShardResult("pets", DataStoreType.Config),
517                 successShardResult("cars", DataStoreType.Operational),
518                 successShardResult("people", DataStoreType.Operational));
519
520         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
521         verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2");
522         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2");
523         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
524         verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1");
525         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1");
526         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
527         verifyNoShardPresent(replicaNode3.configDataStore(), "people");
528         verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
529     }
530
531     @Test
532     public void testChangeMemberVotingStatesForShard() throws Exception {
533         String name = "testChangeMemberVotingStatusForShard";
534         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
535         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
536                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
537                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
538                 .build();
539
540         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
541                 .moduleShardsConfig(moduleShardsConfig).build();
542
543         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
544                 .moduleShardsConfig(moduleShardsConfig).build();
545
546         leaderNode1.configDataStore().waitTillReady();
547         replicaNode3.configDataStore().waitTillReady();
548         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
549         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
550         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
551
552         // Invoke RPC service on member-3 to change voting status
553
554         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
555                 replicaNode3.operDataStore());
556
557         RpcResult<Void> rpcResult = service3
558                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
559                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
560                         .setMemberVotingState(ImmutableList.of(
561                                 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
562                                 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build()))
563                         .build())
564                 .get(10, TimeUnit.SECONDS);
565         verifySuccessfulRpcResult(rpcResult);
566
567         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
568                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
569         verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
570                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
571         verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
572                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
573     }
574
575     @Test
576     public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
577         String name = "testChangeMemberVotingStatesForSingleNodeShard";
578         String moduleShardsConfig = "module-shards-member1.conf";
579         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
580                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
581                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
582                 .build();
583
584         leaderNode.configDataStore().waitTillReady();
585
586         // Invoke RPC service on member-3 to change voting status
587
588         ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
589                 leaderNode.operDataStore());
590
591         RpcResult<Void> rpcResult = service
592                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
593                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
594                         .setMemberVotingState(ImmutableList
595                                 .of(new MemberVotingStateBuilder().setMemberName("member-1").setVoting(FALSE).build()))
596                         .build())
597                 .get(10, TimeUnit.SECONDS);
598         verifyFailedRpcResult(rpcResult);
599
600         verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE));
601     }
602
603     @Test
604     public void testChangeMemberVotingStatesForAllShards() throws Exception {
605         String name = "testChangeMemberVotingStatesForAllShards";
606         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
607         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
608                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
609                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
610                 .build();
611
612         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
613                 .moduleShardsConfig(moduleShardsConfig).build();
614
615         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
616                 .moduleShardsConfig(moduleShardsConfig).build();
617
618         leaderNode1.configDataStore().waitTillReady();
619         leaderNode1.operDataStore().waitTillReady();
620         replicaNode3.configDataStore().waitTillReady();
621         replicaNode3.operDataStore().waitTillReady();
622         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
623         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
624         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
625
626         // Invoke RPC service on member-3 to change voting status
627
628         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
629                 replicaNode3.operDataStore());
630
631         RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
632                 new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
633                         new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
634                         new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build())).build())
635                 .get(10, TimeUnit.SECONDS);
636         ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
637         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
638                 successShardResult("people", DataStoreType.Config),
639                 successShardResult("cars", DataStoreType.Operational),
640                 successShardResult("people", DataStoreType.Operational));
641
642         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
643                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
644                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
645                 new String[]{"cars", "people"}, new SimpleEntry<>("member-1", TRUE),
646                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
647     }
648
649     @Test
650     public void testFlipMemberVotingStates() throws Exception {
651         String name = "testFlipMemberVotingStates";
652
653         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
654                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
655                 new ServerInfo("member-3", false)));
656
657         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
658         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
659         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
660
661         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
662         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
663                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
664                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
665                 .build();
666
667         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
668                 .moduleShardsConfig(moduleShardsConfig).build();
669
670         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
671                 .moduleShardsConfig(moduleShardsConfig).build();
672
673         leaderNode1.configDataStore().waitTillReady();
674         leaderNode1.operDataStore().waitTillReady();
675         replicaNode3.configDataStore().waitTillReady();
676         replicaNode3.operDataStore().waitTillReady();
677         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
678                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE));
679
680         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
681                 replicaNode3.operDataStore());
682
683         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards()
684                 .get(10, TimeUnit.SECONDS);
685         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
686         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
687                 successShardResult("people", DataStoreType.Config),
688                 successShardResult("cars", DataStoreType.Operational),
689                 successShardResult("people", DataStoreType.Operational));
690
691         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
692                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
693                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
694                 new String[]{"cars", "people"},
695                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
696                 new SimpleEntry<>("member-3", TRUE));
697
698         // Leadership should have transferred to member 3 since it is the only remaining voting member.
699         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
700             assertNotNull("Expected non-null leader Id", raftState.getLeader());
701             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
702                     raftState.getLeader().contains("member-3"));
703         });
704
705         verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
706             assertNotNull("Expected non-null leader Id", raftState.getLeader());
707             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
708                     raftState.getLeader().contains("member-3"));
709         });
710
711         // Flip the voting states back to the original states.
712
713         rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS);
714         result = verifySuccessfulRpcResult(rpcResult);
715         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
716                 successShardResult("people", DataStoreType.Config),
717                 successShardResult("cars", DataStoreType.Operational),
718                 successShardResult("people", DataStoreType.Operational));
719
720         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
721                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
722                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
723                 new String[]{"cars", "people"},
724                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
725                 new SimpleEntry<>("member-3", FALSE));
726
727         // Leadership should have transferred to member 1 or 2.
728         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
729             assertNotNull("Expected non-null leader Id", raftState.getLeader());
730             assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
731                     raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
732         });
733     }
734
735     @Test
736     public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
737         String name = "testFlipMemberVotingStatesWithNoInitialLeader";
738
739         // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
740         // non-voting and simulated as down by not starting them up.
741         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
742                 new ServerInfo("member-1", false), new ServerInfo("member-2", false),
743                 new ServerInfo("member-3", false), new ServerInfo("member-4", true),
744                 new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
745
746         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
747         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
748         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
749
750         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
751         final MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
752                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
753                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
754                 .build();
755
756         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
757                 .moduleShardsConfig(moduleShardsConfig).build();
758
759         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
760                 .moduleShardsConfig(moduleShardsConfig).build();
761
762         // Initially there won't be a leader b/c all the up nodes are non-voting.
763
764         replicaNode1.waitForMembersUp("member-2", "member-3");
765
766         verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", FALSE),
767                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE),
768                 new SimpleEntry<>("member-4", TRUE), new SimpleEntry<>("member-5", TRUE),
769                 new SimpleEntry<>("member-6", TRUE));
770
771         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
772             assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
773
774         ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
775                 replicaNode1.operDataStore());
776
777         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
778                 .get(10, TimeUnit.SECONDS);
779         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
780         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
781                 successShardResult("people", DataStoreType.Config),
782                 successShardResult("cars", DataStoreType.Operational),
783                 successShardResult("people", DataStoreType.Operational));
784
785         verifyVotingStates(new AbstractDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
786                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
787                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
788                 new String[]{"cars", "people"},
789                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
790                 new SimpleEntry<>("member-3", TRUE), new SimpleEntry<>("member-4", FALSE),
791                 new SimpleEntry<>("member-5", FALSE), new SimpleEntry<>("member-6", FALSE));
792
793         // Since member 1 was changed to voting and there was no leader, it should've started and election
794         // and become leader
795         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
796             assertNotNull("Expected non-null leader Id", raftState.getLeader());
797             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
798                     raftState.getLeader().contains("member-1"));
799         });
800
801         verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
802             assertNotNull("Expected non-null leader Id", raftState.getLeader());
803             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
804                     raftState.getLeader().contains("member-1"));
805         });
806     }
807
808     @Test
809     public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
810         String name = "testFlipMemberVotingStatesWithVotingMembersDown";
811
812         // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
813         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
814                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
815                 new ServerInfo("member-3", true), new ServerInfo("member-4", false),
816                 new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
817
818         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
819         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
820         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
821
822         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
823         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
824                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
825                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
826                 .build();
827
828         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
829                 .moduleShardsConfig(moduleShardsConfig).build();
830
831         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
832                 .moduleShardsConfig(moduleShardsConfig).build();
833
834         leaderNode1.configDataStore().waitTillReady();
835         leaderNode1.operDataStore().waitTillReady();
836         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
837                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", TRUE),
838                 new SimpleEntry<>("member-4", FALSE), new SimpleEntry<>("member-5", FALSE),
839                 new SimpleEntry<>("member-6", FALSE));
840
841         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
842                 leaderNode1.operDataStore());
843
844         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
845                 .get(10, TimeUnit.SECONDS);
846         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
847         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
848                 successShardResult("people", DataStoreType.Config),
849                 successShardResult("cars", DataStoreType.Operational),
850                 successShardResult("people", DataStoreType.Operational));
851
852         // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
853         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
854                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
855                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
856                 new String[]{"cars", "people"},
857                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
858                 new SimpleEntry<>("member-3", FALSE), new SimpleEntry<>("member-4", TRUE),
859                 new SimpleEntry<>("member-5", TRUE), new SimpleEntry<>("member-6", TRUE));
860
861         // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
862         // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
863         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
864             assertNotNull("Expected non-null leader Id", raftState.getLeader());
865             assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
866         });
867     }
868
869     private static void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
870             String member, String datastoreTypeSuffix, String... shards) {
871         String[] datastoreTypes = {"config_", "oper_"};
872         for (String type : datastoreTypes) {
873             for (String shard : shards) {
874                 List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
875                 for (ServerInfo info : serverConfig.getServerConfig()) {
876                     newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
877                             type + datastoreTypeSuffix).toString(), info.isVoting()));
878                 }
879
880                 String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
881                         type + datastoreTypeSuffix).toString();
882                 InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
883                 InMemoryJournal.addEntry(shardID, 2, new ReplicatedLogImplEntry(0, 1,
884                         new ServerConfigurationPayload(newServerInfo)));
885             }
886         }
887     }
888
889     @SafeVarargs
890     private static void verifyVotingStates(AbstractDataStore[] datastores, String[] shards,
891             SimpleEntry<String, Boolean>... expStates) throws Exception {
892         for (AbstractDataStore datastore: datastores) {
893             for (String shard: shards) {
894                 verifyVotingStates(datastore, shard, expStates);
895             }
896         }
897     }
898
899     @SafeVarargs
900     private static void verifyVotingStates(AbstractDataStore datastore, String shardName,
901             SimpleEntry<String, Boolean>... expStates) throws Exception {
902         String localMemberName = datastore.getActorContext().getCurrentMemberName().getName();
903         Map<String, Boolean> expStateMap = new HashMap<>();
904         for (Entry<String, Boolean> e: expStates) {
905             expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
906                     datastore.getActorContext().getDataStoreName()).toString(), e.getValue());
907         }
908
909         verifyRaftState(datastore, shardName, raftState -> {
910             String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
911                     datastore.getActorContext().getDataStoreName()).toString();
912             assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
913             for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
914                 assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
915             }
916         });
917     }
918
919     private static void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {
920         Map<String, ShardResult> expResultsMap = new HashMap<>();
921         for (ShardResult r: expShardResults) {
922             expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
923         }
924
925         for (ShardResult result: shardResults) {
926             ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
927             assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
928                     result.getDataStoreType()), exp);
929             assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded());
930             if (exp.isSucceeded()) {
931                 assertNull("Expected null error message", result.getErrorMessage());
932             } else {
933                 assertNotNull("Expected error message", result.getErrorMessage());
934             }
935         }
936
937         if (!expResultsMap.isEmpty()) {
938             fail("Missing shard results for " + expResultsMap.keySet());
939         }
940     }
941
942     private static ShardResult successShardResult(String shardName, DataStoreType type) {
943         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(TRUE).build();
944     }
945
946     private static ShardResult failedShardResult(String shardName, DataStoreType type) {
947         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(FALSE).build();
948     }
949 }