5cca07428f2e61ddf2b6700bb32e55a7abfb1b67
[controller.git] / opendaylight / md-sal / sal-cluster-admin-impl / src / test / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcServiceTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.admin;
9
10 import static java.lang.Boolean.FALSE;
11 import static java.lang.Boolean.TRUE;
12 import static org.hamcrest.CoreMatchers.anyOf;
13 import static org.hamcrest.CoreMatchers.containsString;
14 import static org.junit.Assert.assertEquals;
15 import static org.junit.Assert.assertFalse;
16 import static org.junit.Assert.assertNotNull;
17 import static org.junit.Assert.assertNull;
18 import static org.junit.Assert.assertThat;
19 import static org.junit.Assert.assertTrue;
20 import static org.junit.Assert.fail;
21 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
22 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
23 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
24
25 import akka.actor.ActorRef;
26 import akka.actor.PoisonPill;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import com.google.common.base.Optional;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.collect.Iterables;
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Sets;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.net.URI;
38 import java.util.AbstractMap.SimpleEntry;
39 import java.util.ArrayList;
40 import java.util.Arrays;
41 import java.util.Collections;
42 import java.util.HashMap;
43 import java.util.HashSet;
44 import java.util.List;
45 import java.util.Map;
46 import java.util.Map.Entry;
47 import java.util.Set;
48 import java.util.concurrent.TimeUnit;
49 import org.apache.commons.lang3.SerializationUtils;
50 import org.junit.After;
51 import org.junit.Before;
52 import org.junit.Test;
53 import org.mockito.Mockito;
54 import org.opendaylight.controller.cluster.access.concepts.MemberName;
55 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
56 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
57 import org.opendaylight.controller.cluster.datastore.MemberNode;
58 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
59 import org.opendaylight.controller.cluster.datastore.Shard;
60 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
61 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
62 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
64 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
65 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
66 import org.opendaylight.controller.cluster.raft.RaftState;
67 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
68 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
69 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
70 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
71 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
74 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
75 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
76 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
77 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
78 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
79 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInputBuilder;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
99 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
100 import org.opendaylight.yangtools.yang.common.RpcError;
101 import org.opendaylight.yangtools.yang.common.RpcResult;
102 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
103
104 /**
105  * Unit tests for ClusterAdminRpcService.
106  *
107  * @author Thomas Pantelis
108  */
109 public class ClusterAdminRpcServiceTest {
110     private static final MemberName MEMBER_1 = MemberName.forName("member-1");
111     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
112     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
113     private final List<MemberNode> memberNodes = new ArrayList<>();
114
115     @Before
116     public void setUp() {
117         InMemoryJournal.clear();
118         InMemorySnapshotStore.clear();
119     }
120
121     @After
122     public void tearDown() {
123         for (MemberNode m : Lists.reverse(memberNodes)) {
124             m.cleanup();
125         }
126         memberNodes.clear();
127     }
128
129     @Test
130     public void testBackupDatastore() throws Exception {
131         MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1")
132                 .moduleShardsConfig("module-shards-member1.conf").waitForShardLeader("cars", "people")
133                 .testName("testBackupDatastore").build();
134
135         String fileName = "target/testBackupDatastore";
136         new File(fileName).delete();
137
138         ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null);
139
140         RpcResult<Void> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder()
141                 .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
142         verifySuccessfulRpcResult(rpcResult);
143
144         try (FileInputStream fis = new FileInputStream(fileName)) {
145             List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
146             assertEquals("DatastoreSnapshot size", 2, snapshots.size());
147
148             ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
149                     snapshots.get(1).getType(), snapshots.get(1));
150             verifyDatastoreSnapshot(node.configDataStore().getActorContext().getDataStoreName(),
151                     map.get(node.configDataStore().getActorContext().getDataStoreName()), "cars", "people");
152         } finally {
153             new File(fileName).delete();
154         }
155
156         // Test failure by killing a shard.
157
158         node.configDataStore().getActorContext().getShardManager().tell(node.datastoreContextBuilder()
159                 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
160
161         ActorRef carsShardActor = node.configDataStore().getActorContext().findLocalShard("cars").get();
162         node.kit().watch(carsShardActor);
163         carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
164         node.kit().expectTerminated(carsShardActor);
165
166         rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build())
167                 .get(5, TimeUnit.SECONDS);
168         assertFalse("isSuccessful", rpcResult.isSuccessful());
169         assertEquals("getErrors", 1, rpcResult.getErrors().size());
170     }
171
172     private static void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot,
173             String... expShardNames) {
174         assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
175         Set<String> shardNames = new HashSet<>();
176         for (DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
177             shardNames.add(s.getName());
178         }
179
180         assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
181     }
182
183     @Test
184     public void testAddRemovePrefixShardReplica() throws Exception {
185         String name = "testAddPrefixShardReplica";
186         String moduleShardsConfig = "module-shards-default.conf";
187
188         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
189                 .moduleShardsConfig(moduleShardsConfig).build();
190         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
191                 .moduleShardsConfig(moduleShardsConfig).build();
192         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
193                 .moduleShardsConfig(moduleShardsConfig).build();
194
195         member1.waitForMembersUp("member-2", "member-3");
196         replicaNode2.kit().waitForMembersUp("member-1", "member-3");
197         replicaNode3.kit().waitForMembersUp("member-1", "member-2");
198
199         final ActorRef shardManager1 = member1.configDataStore().getActorContext().getShardManager();
200
201         shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
202                         new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
203                         "prefix", Collections.singleton(MEMBER_1))),
204                 ActorRef.noSender());
205
206         member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(),
207                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
208
209         final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
210         final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
211         Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
212
213         addPrefixShardReplica(replicaNode2, identifier, serializer,
214                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1");
215
216         addPrefixShardReplica(replicaNode3, identifier, serializer,
217                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1", "member-2");
218
219         verifyRaftPeersPresent(member1.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
220                 "member-2", "member-3");
221
222         removePrefixShardReplica(member1, identifier, "member-3", serializer,
223                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-2");
224
225         verifyNoShardPresent(replicaNode3.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
226         verifyRaftPeersPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
227                 "member-1");
228
229         removePrefixShardReplica(member1, identifier, "member-2", serializer,
230                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
231
232         verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
233     }
234
235     @Test
236     public void testAddShardReplica() throws Exception {
237         String name = "testAddShardReplica";
238         String moduleShardsConfig = "module-shards-cars-member-1.conf";
239         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
240                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
241
242         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
243                 .moduleShardsConfig(moduleShardsConfig).build();
244
245         leaderNode1.waitForMembersUp("member-2");
246
247         doAddShardReplica(newReplicaNode2, "cars", "member-1");
248
249         MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
250                 .moduleShardsConfig(moduleShardsConfig).build();
251
252         leaderNode1.waitForMembersUp("member-3");
253         newReplicaNode2.waitForMembersUp("member-3");
254
255         doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
256
257         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
258         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
259
260         // Write data to member-2's config datastore and read/verify via member-3
261         final NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
262                 newReplicaNode3.configDataStore());
263
264         // Write data to member-3's oper datastore and read/verify via member-2
265         writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
266
267         // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 -
268         // 2 ServerConfigurationPayload entries and the transaction payload entry.
269
270         RaftStateVerifier verifier = raftState -> {
271             assertEquals("Commit index", 2, raftState.getCommitIndex());
272             assertEquals("Last applied index", 2, raftState.getLastApplied());
273         };
274
275         verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
276         verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
277
278         verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
279         verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
280
281         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
282         verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
283
284         // Restart member-3 and verify the cars config shard is re-instated.
285
286         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
287         newReplicaNode3.cleanup();
288
289         newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
290                 .moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
291
292         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
293         readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
294     }
295
296     @Test
297     public void testAddShardReplicaFailures() throws Exception {
298         String name = "testAddShardReplicaFailures";
299         MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
300                 .moduleShardsConfig("module-shards-cars-member-1.conf").build();
301
302         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
303                 memberNode.operDataStore(), null);
304
305         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
306                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
307         verifyFailedRpcResult(rpcResult);
308
309         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars")
310                 .build()).get(10, TimeUnit.SECONDS);
311         verifyFailedRpcResult(rpcResult);
312
313         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people")
314                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
315         verifyFailedRpcResult(rpcResult);
316     }
317
318     private static NormalizedNode<?, ?> writeCarsNodeAndVerify(AbstractDataStore writeToStore,
319             AbstractDataStore readFromStore) throws Exception {
320         DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
321         NormalizedNode<?, ?> carsNode = CarsModel.create();
322         writeTx.write(CarsModel.BASE_PATH, carsNode);
323
324         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
325         Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
326         assertEquals("canCommit", TRUE, canCommit);
327         cohort.preCommit().get(5, TimeUnit.SECONDS);
328         cohort.commit().get(5, TimeUnit.SECONDS);
329
330         readCarsNodeAndVerify(readFromStore, carsNode);
331         return carsNode;
332     }
333
334     private static void readCarsNodeAndVerify(AbstractDataStore readFromStore,
335             NormalizedNode<?, ?> expCarsNode) throws Exception {
336         Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction()
337                 .read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
338         assertTrue("isPresent", optional.isPresent());
339         assertEquals("Data node", expCarsNode, optional.get());
340     }
341
342     private void addPrefixShardReplica(final MemberNode memberNode,
343                                        final InstanceIdentifier<?> identifier,
344                                        final BindingNormalizedNodeSerializer serializer,
345                                        final String shardName,
346                                        final String... peerMemberNames) throws Exception {
347
348         final AddPrefixShardReplicaInput input = new AddPrefixShardReplicaInputBuilder()
349                 .setShardPrefix(identifier)
350                 .setDataStoreType(DataStoreType.Config).build();
351
352         final ClusterAdminRpcService service =
353                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
354
355         final RpcResult<Void> rpcResult = service.addPrefixShardReplica(input).get(10, TimeUnit.SECONDS);
356         verifySuccessfulRpcResult(rpcResult);
357
358         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
359         Optional<ActorRef> optional = memberNode.configDataStore().getActorContext().findLocalShard(shardName);
360         assertTrue("Replica shard not present", optional.isPresent());
361     }
362
363     private void removePrefixShardReplica(final MemberNode memberNode,
364                                           final InstanceIdentifier<?> identifier,
365                                           final String removeFromMember,
366                                           final BindingNormalizedNodeSerializer serializer,
367                                           final String shardName,
368                                           final String... peerMemberNames) throws Exception {
369         final RemovePrefixShardReplicaInput input = new RemovePrefixShardReplicaInputBuilder()
370                 .setDataStoreType(DataStoreType.Config)
371                 .setShardPrefix(identifier)
372                 .setMemberName(removeFromMember).build();
373
374         final ClusterAdminRpcService service =
375                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
376
377         final RpcResult<Void> rpcResult = service.removePrefixShardReplica(input).get(10, TimeUnit.SECONDS);
378         verifySuccessfulRpcResult(rpcResult);
379
380         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
381     }
382
383     private static void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames)
384             throws Exception {
385         memberNode.waitForMembersUp(peerMemberNames);
386
387         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
388                 memberNode.operDataStore(), null);
389
390         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
391                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
392         verifySuccessfulRpcResult(rpcResult);
393
394         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
395
396         Optional<ActorRef> optional = memberNode.operDataStore().getActorContext().findLocalShard(shardName);
397         assertFalse("Oper shard present", optional.isPresent());
398
399         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
400                 .setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
401         verifySuccessfulRpcResult(rpcResult);
402
403         verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
404     }
405
406     private static <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
407         if (!rpcResult.isSuccessful()) {
408             if (rpcResult.getErrors().size() > 0) {
409                 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
410                 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
411             }
412
413             fail("Rpc failed with no error");
414         }
415
416         return rpcResult.getResult();
417     }
418
419     private static void verifyFailedRpcResult(RpcResult<Void> rpcResult) {
420         assertFalse("RpcResult", rpcResult.isSuccessful());
421         assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
422         RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
423         assertNotNull("RpcResult error message null", error.getMessage());
424     }
425
426     @Test
427     public void testRemoveShardReplica() throws Exception {
428         String name = "testRemoveShardReplica";
429         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
430         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
431                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
432                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
433                 .build();
434
435         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
436                 .moduleShardsConfig(moduleShardsConfig).build();
437
438         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
439                 .moduleShardsConfig(moduleShardsConfig).build();
440
441         leaderNode1.configDataStore().waitTillReady();
442         replicaNode3.configDataStore().waitTillReady();
443         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
444         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
445         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
446
447         // Invoke RPC service on member-3 to remove it's local shard
448
449         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
450                 replicaNode3.operDataStore(), null);
451
452         RpcResult<Void> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder()
453                 .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build())
454                 .get(10, TimeUnit.SECONDS);
455         verifySuccessfulRpcResult(rpcResult);
456
457         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
458         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
459         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
460
461         // Restart member-2 and verify member-3 isn't present.
462
463         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
464         replicaNode2.cleanup();
465
466         MemberNode newPeplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
467                 .moduleShardsConfig(moduleShardsConfig).build();
468
469         newPeplicaNode2.configDataStore().waitTillReady();
470         verifyRaftPeersPresent(newPeplicaNode2.configDataStore(), "cars", "member-1");
471
472         // Invoke RPC service on member-1 to remove member-2
473
474         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
475                 leaderNode1.operDataStore(), null);
476
477         rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars")
478                 .setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
479         verifySuccessfulRpcResult(rpcResult);
480
481         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
482         verifyNoShardPresent(newPeplicaNode2.configDataStore(), "cars");
483     }
484
485     @Test
486     public void testRemoveShardLeaderReplica() throws Exception {
487         String name = "testRemoveShardLeaderReplica";
488         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
489         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
490                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
491                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
492                 .build();
493
494         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
495                 .moduleShardsConfig(moduleShardsConfig).build();
496
497         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
498                 .moduleShardsConfig(moduleShardsConfig).build();
499
500         leaderNode1.configDataStore().waitTillReady();
501         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
502         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
503         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
504
505         replicaNode2.waitForMembersUp("member-1", "member-3");
506         replicaNode3.waitForMembersUp("member-1", "member-2");
507
508         // Invoke RPC service on leader member-1 to remove it's local shard
509
510         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
511                 leaderNode1.operDataStore(), null);
512
513         RpcResult<Void> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
514                 .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build())
515                 .get(10, TimeUnit.SECONDS);
516         verifySuccessfulRpcResult(rpcResult);
517
518         verifyRaftState(replicaNode2.configDataStore(), "cars", raftState ->
519                 assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
520                         containsString("member-3"))));
521
522         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
523         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
524         verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
525     }
526
527     @Test
528     public void testAddReplicasForAllShards() throws Exception {
529         String name = "testAddReplicasForAllShards";
530         String moduleShardsConfig = "module-shards-member1.conf";
531         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
532                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
533
534         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
535                 "pets", null, Arrays.asList(MEMBER_1));
536         leaderNode1.configDataStore().getActorContext().getShardManager().tell(
537                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
538         leaderNode1.kit().expectMsgClass(Success.class);
539         leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorContext(), "pets");
540
541         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
542                 .moduleShardsConfig(moduleShardsConfig).build();
543
544         leaderNode1.waitForMembersUp("member-2");
545         newReplicaNode2.waitForMembersUp("member-1");
546
547         newReplicaNode2.configDataStore().getActorContext().getShardManager().tell(
548                 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
549         newReplicaNode2.kit().expectMsgClass(Success.class);
550
551         newReplicaNode2.operDataStore().getActorContext().getShardManager().tell(
552                 new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
553                         "no-leader", null, Arrays.asList(MEMBER_1)), Shard.builder(), null),
554                                 newReplicaNode2.kit().getRef());
555         newReplicaNode2.kit().expectMsgClass(Success.class);
556
557         ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
558                 newReplicaNode2.operDataStore(), null);
559
560         RpcResult<AddReplicasForAllShardsOutput> rpcResult =
561                 service.addReplicasForAllShards().get(10, TimeUnit.SECONDS);
562         AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
563         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
564                 successShardResult("people", DataStoreType.Config),
565                 successShardResult("pets", DataStoreType.Config),
566                 successShardResult("cars", DataStoreType.Operational),
567                 successShardResult("people", DataStoreType.Operational),
568                 failedShardResult("no-leader", DataStoreType.Operational));
569
570         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
571         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
572         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
573         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
574         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
575     }
576
577     @Test
578     public void testRemoveAllShardReplicas() throws Exception {
579         String name = "testRemoveAllShardReplicas";
580         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
581         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
582                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
583                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
584                 .build();
585
586         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
587                 .moduleShardsConfig(moduleShardsConfig).build();
588
589         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
590                 .moduleShardsConfig(moduleShardsConfig).build();
591
592         leaderNode1.configDataStore().waitTillReady();
593         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
594         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
595         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
596
597         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
598                 "pets", null, Arrays.asList(MEMBER_1, MEMBER_2, MEMBER_3));
599         leaderNode1.configDataStore().getActorContext().getShardManager().tell(
600                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
601         leaderNode1.kit().expectMsgClass(Success.class);
602
603         replicaNode2.configDataStore().getActorContext().getShardManager().tell(
604                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef());
605         replicaNode2.kit().expectMsgClass(Success.class);
606
607         replicaNode3.configDataStore().getActorContext().getShardManager().tell(
608                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef());
609         replicaNode3.kit().expectMsgClass(Success.class);
610
611         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3");
612         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
613         verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
614
615         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
616                 replicaNode3.operDataStore(), null);
617
618         RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
619                 new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
620         RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult);
621         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
622                 successShardResult("people", DataStoreType.Config),
623                 successShardResult("pets", DataStoreType.Config),
624                 successShardResult("cars", DataStoreType.Operational),
625                 successShardResult("people", DataStoreType.Operational));
626
627         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
628         verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2");
629         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2");
630         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
631         verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1");
632         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1");
633         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
634         verifyNoShardPresent(replicaNode3.configDataStore(), "people");
635         verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
636     }
637
638     @Test
639     public void testChangeMemberVotingStatesForShard() throws Exception {
640         String name = "testChangeMemberVotingStatusForShard";
641         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
642         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
643                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
644                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
645                 .build();
646
647         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
648                 .moduleShardsConfig(moduleShardsConfig).build();
649
650         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
651                 .moduleShardsConfig(moduleShardsConfig).build();
652
653         leaderNode1.configDataStore().waitTillReady();
654         replicaNode3.configDataStore().waitTillReady();
655         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
656         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
657         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
658
659         // Invoke RPC service on member-3 to change voting status
660
661         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
662                 replicaNode3.operDataStore(), null);
663
664         RpcResult<Void> rpcResult = service3
665                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
666                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
667                         .setMemberVotingState(ImmutableList.of(
668                                 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
669                                 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build()))
670                         .build())
671                 .get(10, TimeUnit.SECONDS);
672         verifySuccessfulRpcResult(rpcResult);
673
674         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
675                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
676         verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
677                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
678         verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
679                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
680     }
681
682     @Test
683     public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
684         String name = "testChangeMemberVotingStatesForSingleNodeShard";
685         String moduleShardsConfig = "module-shards-member1.conf";
686         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
687                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
688                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
689                 .build();
690
691         leaderNode.configDataStore().waitTillReady();
692
693         // Invoke RPC service on member-3 to change voting status
694
695         ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
696                 leaderNode.operDataStore(), null);
697
698         RpcResult<Void> rpcResult = service
699                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
700                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
701                         .setMemberVotingState(ImmutableList
702                                 .of(new MemberVotingStateBuilder().setMemberName("member-1").setVoting(FALSE).build()))
703                         .build())
704                 .get(10, TimeUnit.SECONDS);
705         verifyFailedRpcResult(rpcResult);
706
707         verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE));
708     }
709
710     @Test
711     public void testChangeMemberVotingStatesForAllShards() throws Exception {
712         String name = "testChangeMemberVotingStatesForAllShards";
713         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
714         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
715                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
716                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
717                 .build();
718
719         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
720                 .moduleShardsConfig(moduleShardsConfig).build();
721
722         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
723                 .moduleShardsConfig(moduleShardsConfig).build();
724
725         leaderNode1.configDataStore().waitTillReady();
726         leaderNode1.operDataStore().waitTillReady();
727         replicaNode3.configDataStore().waitTillReady();
728         replicaNode3.operDataStore().waitTillReady();
729         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
730         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
731         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
732
733         // Invoke RPC service on member-3 to change voting status
734
735         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
736                 replicaNode3.operDataStore(), null);
737
738         RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
739                 new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
740                         new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
741                         new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build())).build())
742                 .get(10, TimeUnit.SECONDS);
743         ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
744         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
745                 successShardResult("people", DataStoreType.Config),
746                 successShardResult("cars", DataStoreType.Operational),
747                 successShardResult("people", DataStoreType.Operational));
748
749         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
750                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
751                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
752                 new String[]{"cars", "people"}, new SimpleEntry<>("member-1", TRUE),
753                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
754     }
755
756     @Test
757     public void testFlipMemberVotingStates() throws Exception {
758         String name = "testFlipMemberVotingStates";
759
760         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
761                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
762                 new ServerInfo("member-3", false)));
763
764         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
765         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
766         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
767
768         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
769         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
770                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
771                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
772                 .build();
773
774         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
775                 .moduleShardsConfig(moduleShardsConfig).build();
776
777         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
778                 .moduleShardsConfig(moduleShardsConfig).build();
779
780         leaderNode1.configDataStore().waitTillReady();
781         leaderNode1.operDataStore().waitTillReady();
782         replicaNode3.configDataStore().waitTillReady();
783         replicaNode3.operDataStore().waitTillReady();
784         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
785                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE));
786
787         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
788                 replicaNode3.operDataStore(), null);
789
790         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards()
791                 .get(10, TimeUnit.SECONDS);
792         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
793         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
794                 successShardResult("people", DataStoreType.Config),
795                 successShardResult("cars", DataStoreType.Operational),
796                 successShardResult("people", DataStoreType.Operational));
797
798         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
799                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
800                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
801                 new String[]{"cars", "people"},
802                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
803                 new SimpleEntry<>("member-3", TRUE));
804
805         // Leadership should have transferred to member 3 since it is the only remaining voting member.
806         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
807             assertNotNull("Expected non-null leader Id", raftState.getLeader());
808             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
809                     raftState.getLeader().contains("member-3"));
810         });
811
812         verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
813             assertNotNull("Expected non-null leader Id", raftState.getLeader());
814             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
815                     raftState.getLeader().contains("member-3"));
816         });
817
818         // Flip the voting states back to the original states.
819
820         rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS);
821         result = verifySuccessfulRpcResult(rpcResult);
822         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
823                 successShardResult("people", DataStoreType.Config),
824                 successShardResult("cars", DataStoreType.Operational),
825                 successShardResult("people", DataStoreType.Operational));
826
827         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
828                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
829                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
830                 new String[]{"cars", "people"},
831                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
832                 new SimpleEntry<>("member-3", FALSE));
833
834         // Leadership should have transferred to member 1 or 2.
835         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
836             assertNotNull("Expected non-null leader Id", raftState.getLeader());
837             assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
838                     raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
839         });
840     }
841
842     @Test
843     public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
844         String name = "testFlipMemberVotingStatesWithNoInitialLeader";
845
846         // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
847         // non-voting and simulated as down by not starting them up.
848         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
849                 new ServerInfo("member-1", false), new ServerInfo("member-2", false),
850                 new ServerInfo("member-3", false), new ServerInfo("member-4", true),
851                 new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
852
853         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
854         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
855         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
856
857         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
858         final MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
859                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
860                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
861                 .build();
862
863         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
864                 .moduleShardsConfig(moduleShardsConfig).build();
865
866         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
867                 .moduleShardsConfig(moduleShardsConfig).build();
868
869         // Initially there won't be a leader b/c all the up nodes are non-voting.
870
871         replicaNode1.waitForMembersUp("member-2", "member-3");
872
873         verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", FALSE),
874                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE),
875                 new SimpleEntry<>("member-4", TRUE), new SimpleEntry<>("member-5", TRUE),
876                 new SimpleEntry<>("member-6", TRUE));
877
878         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
879             assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
880
881         ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
882                 replicaNode1.operDataStore(), null);
883
884         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
885                 .get(10, TimeUnit.SECONDS);
886         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
887         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
888                 successShardResult("people", DataStoreType.Config),
889                 successShardResult("cars", DataStoreType.Operational),
890                 successShardResult("people", DataStoreType.Operational));
891
892         verifyVotingStates(new AbstractDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
893                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
894                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
895                 new String[]{"cars", "people"},
896                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
897                 new SimpleEntry<>("member-3", TRUE), new SimpleEntry<>("member-4", FALSE),
898                 new SimpleEntry<>("member-5", FALSE), new SimpleEntry<>("member-6", FALSE));
899
900         // Since member 1 was changed to voting and there was no leader, it should've started and election
901         // and become leader
902         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
903             assertNotNull("Expected non-null leader Id", raftState.getLeader());
904             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
905                     raftState.getLeader().contains("member-1"));
906         });
907
908         verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
909             assertNotNull("Expected non-null leader Id", raftState.getLeader());
910             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
911                     raftState.getLeader().contains("member-1"));
912         });
913     }
914
915     @Test
916     public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
917         String name = "testFlipMemberVotingStatesWithVotingMembersDown";
918
919         // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
920         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
921                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
922                 new ServerInfo("member-3", true), new ServerInfo("member-4", false),
923                 new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
924
925         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
926         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
927         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
928
929         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
930         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
931                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
932                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
933                 .build();
934
935         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
936                 .moduleShardsConfig(moduleShardsConfig).build();
937
938         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
939                 .moduleShardsConfig(moduleShardsConfig).build();
940
941         leaderNode1.configDataStore().waitTillReady();
942         leaderNode1.operDataStore().waitTillReady();
943         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
944                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", TRUE),
945                 new SimpleEntry<>("member-4", FALSE), new SimpleEntry<>("member-5", FALSE),
946                 new SimpleEntry<>("member-6", FALSE));
947
948         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
949                 leaderNode1.operDataStore(), null);
950
951         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
952                 .get(10, TimeUnit.SECONDS);
953         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
954         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
955                 successShardResult("people", DataStoreType.Config),
956                 successShardResult("cars", DataStoreType.Operational),
957                 successShardResult("people", DataStoreType.Operational));
958
959         // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
960         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
961                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
962                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
963                 new String[]{"cars", "people"},
964                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
965                 new SimpleEntry<>("member-3", FALSE), new SimpleEntry<>("member-4", TRUE),
966                 new SimpleEntry<>("member-5", TRUE), new SimpleEntry<>("member-6", TRUE));
967
968         // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
969         // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
970         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
971             assertNotNull("Expected non-null leader Id", raftState.getLeader());
972             assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
973         });
974     }
975
976     private static void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
977             String member, String datastoreTypeSuffix, String... shards) {
978         String[] datastoreTypes = {"config_", "oper_"};
979         for (String type : datastoreTypes) {
980             for (String shard : shards) {
981                 List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
982                 for (ServerInfo info : serverConfig.getServerConfig()) {
983                     newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
984                             type + datastoreTypeSuffix).toString(), info.isVoting()));
985                 }
986
987                 String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
988                         type + datastoreTypeSuffix).toString();
989                 InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
990                 InMemoryJournal.addEntry(shardID, 2, new ReplicatedLogImplEntry(0, 1,
991                         new ServerConfigurationPayload(newServerInfo)));
992             }
993         }
994     }
995
996     @SafeVarargs
997     private static void verifyVotingStates(AbstractDataStore[] datastores, String[] shards,
998             SimpleEntry<String, Boolean>... expStates) throws Exception {
999         for (AbstractDataStore datastore: datastores) {
1000             for (String shard: shards) {
1001                 verifyVotingStates(datastore, shard, expStates);
1002             }
1003         }
1004     }
1005
1006     @SafeVarargs
1007     private static void verifyVotingStates(AbstractDataStore datastore, String shardName,
1008             SimpleEntry<String, Boolean>... expStates) throws Exception {
1009         String localMemberName = datastore.getActorContext().getCurrentMemberName().getName();
1010         Map<String, Boolean> expStateMap = new HashMap<>();
1011         for (Entry<String, Boolean> e: expStates) {
1012             expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
1013                     datastore.getActorContext().getDataStoreName()).toString(), e.getValue());
1014         }
1015
1016         verifyRaftState(datastore, shardName, raftState -> {
1017             String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
1018                     datastore.getActorContext().getDataStoreName()).toString();
1019             assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
1020             for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1021                 assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
1022             }
1023         });
1024     }
1025
1026     private static void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {
1027         Map<String, ShardResult> expResultsMap = new HashMap<>();
1028         for (ShardResult r: expShardResults) {
1029             expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
1030         }
1031
1032         for (ShardResult result: shardResults) {
1033             ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
1034             assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
1035                     result.getDataStoreType()), exp);
1036             assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded());
1037             if (exp.isSucceeded()) {
1038                 assertNull("Expected null error message", result.getErrorMessage());
1039             } else {
1040                 assertNotNull("Expected error message", result.getErrorMessage());
1041             }
1042         }
1043
1044         if (!expResultsMap.isEmpty()) {
1045             fail("Missing shard results for " + expResultsMap.keySet());
1046         }
1047     }
1048
1049     private static ShardResult successShardResult(String shardName, DataStoreType type) {
1050         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(TRUE).build();
1051     }
1052
1053     private static ShardResult failedShardResult(String shardName, DataStoreType type) {
1054         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(FALSE).build();
1055     }
1056 }