Bug 7805: Add make-leader-local rpc for module based shard.
[controller.git] / opendaylight / md-sal / sal-cluster-admin-impl / src / test / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcServiceTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.admin;
9
10 import static java.lang.Boolean.FALSE;
11 import static java.lang.Boolean.TRUE;
12 import static org.hamcrest.CoreMatchers.anyOf;
13 import static org.hamcrest.CoreMatchers.containsString;
14 import static org.junit.Assert.assertEquals;
15 import static org.junit.Assert.assertFalse;
16 import static org.junit.Assert.assertNotNull;
17 import static org.junit.Assert.assertNull;
18 import static org.junit.Assert.assertThat;
19 import static org.junit.Assert.assertTrue;
20 import static org.junit.Assert.fail;
21 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
22 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
23 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
24
25 import akka.actor.ActorRef;
26 import akka.actor.PoisonPill;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import com.google.common.base.Optional;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.collect.Iterables;
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Sets;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.net.URI;
38 import java.util.AbstractMap.SimpleEntry;
39 import java.util.ArrayList;
40 import java.util.Arrays;
41 import java.util.Collections;
42 import java.util.HashMap;
43 import java.util.HashSet;
44 import java.util.List;
45 import java.util.Map;
46 import java.util.Map.Entry;
47 import java.util.Set;
48 import java.util.concurrent.TimeUnit;
49 import org.apache.commons.lang3.SerializationUtils;
50 import org.junit.After;
51 import org.junit.Before;
52 import org.junit.Test;
53 import org.mockito.Mockito;
54 import org.opendaylight.controller.cluster.access.concepts.MemberName;
55 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
56 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
57 import org.opendaylight.controller.cluster.datastore.MemberNode;
58 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
59 import org.opendaylight.controller.cluster.datastore.Shard;
60 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
61 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
62 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
64 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
65 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
66 import org.opendaylight.controller.cluster.raft.RaftState;
67 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
68 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
69 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
70 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
71 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
74 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
75 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
76 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
77 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
78 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
79 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInputBuilder;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
100 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
101 import org.opendaylight.yangtools.yang.common.RpcError;
102 import org.opendaylight.yangtools.yang.common.RpcResult;
103 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
104
105 /**
106  * Unit tests for ClusterAdminRpcService.
107  *
108  * @author Thomas Pantelis
109  */
110 public class ClusterAdminRpcServiceTest {
111     private static final MemberName MEMBER_1 = MemberName.forName("member-1");
112     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
113     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
114     private final List<MemberNode> memberNodes = new ArrayList<>();
115
116     @Before
117     public void setUp() {
118         InMemoryJournal.clear();
119         InMemorySnapshotStore.clear();
120     }
121
122     @After
123     public void tearDown() {
124         for (MemberNode m : Lists.reverse(memberNodes)) {
125             m.cleanup();
126         }
127         memberNodes.clear();
128     }
129
130     @Test
131     public void testBackupDatastore() throws Exception {
132         MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1")
133                 .moduleShardsConfig("module-shards-member1.conf").waitForShardLeader("cars", "people")
134                 .testName("testBackupDatastore").build();
135
136         String fileName = "target/testBackupDatastore";
137         new File(fileName).delete();
138
139         ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null);
140
141         RpcResult<Void> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder()
142                 .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
143         verifySuccessfulRpcResult(rpcResult);
144
145         try (FileInputStream fis = new FileInputStream(fileName)) {
146             List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
147             assertEquals("DatastoreSnapshot size", 2, snapshots.size());
148
149             ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
150                     snapshots.get(1).getType(), snapshots.get(1));
151             verifyDatastoreSnapshot(node.configDataStore().getActorContext().getDataStoreName(),
152                     map.get(node.configDataStore().getActorContext().getDataStoreName()), "cars", "people");
153         } finally {
154             new File(fileName).delete();
155         }
156
157         // Test failure by killing a shard.
158
159         node.configDataStore().getActorContext().getShardManager().tell(node.datastoreContextBuilder()
160                 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
161
162         ActorRef carsShardActor = node.configDataStore().getActorContext().findLocalShard("cars").get();
163         node.kit().watch(carsShardActor);
164         carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
165         node.kit().expectTerminated(carsShardActor);
166
167         rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build())
168                 .get(5, TimeUnit.SECONDS);
169         assertFalse("isSuccessful", rpcResult.isSuccessful());
170         assertEquals("getErrors", 1, rpcResult.getErrors().size());
171     }
172
173     private static void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot,
174             String... expShardNames) {
175         assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
176         Set<String> shardNames = new HashSet<>();
177         for (DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
178             shardNames.add(s.getName());
179         }
180
181         assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
182     }
183
184     @Test
185     public void testAddRemovePrefixShardReplica() throws Exception {
186         String name = "testAddPrefixShardReplica";
187         String moduleShardsConfig = "module-shards-default.conf";
188
189         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
190                 .moduleShardsConfig(moduleShardsConfig).build();
191         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
192                 .moduleShardsConfig(moduleShardsConfig).build();
193         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
194                 .moduleShardsConfig(moduleShardsConfig).build();
195
196         member1.waitForMembersUp("member-2", "member-3");
197         replicaNode2.kit().waitForMembersUp("member-1", "member-3");
198         replicaNode3.kit().waitForMembersUp("member-1", "member-2");
199
200         final ActorRef shardManager1 = member1.configDataStore().getActorContext().getShardManager();
201
202         shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
203                         new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
204                         "prefix", Collections.singleton(MEMBER_1))),
205                 ActorRef.noSender());
206
207         member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(),
208                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
209
210         final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
211         final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
212         Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
213
214         addPrefixShardReplica(replicaNode2, identifier, serializer,
215                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1");
216
217         addPrefixShardReplica(replicaNode3, identifier, serializer,
218                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1", "member-2");
219
220         verifyRaftPeersPresent(member1.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
221                 "member-2", "member-3");
222
223         removePrefixShardReplica(member1, identifier, "member-3", serializer,
224                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-2");
225
226         verifyNoShardPresent(replicaNode3.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
227         verifyRaftPeersPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
228                 "member-1");
229
230         removePrefixShardReplica(member1, identifier, "member-2", serializer,
231                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
232
233         verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
234     }
235
236     @Test
237     public void testModuleShardLeaderMovement() throws Exception {
238         String name = "testModuleShardLeaderMovement";
239         String moduleShardsConfig = "module-shards-member1.conf";
240
241         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
242                 .waitForShardLeader("cars").moduleShardsConfig(moduleShardsConfig).build();
243         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
244                 .moduleShardsConfig(moduleShardsConfig).build();
245         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
246                 .moduleShardsConfig(moduleShardsConfig).build();
247
248         member1.waitForMembersUp("member-2", "member-3");
249
250         doAddShardReplica(replicaNode2, "cars", "member-1");
251         doAddShardReplica(replicaNode3, "cars", "member-1", "member-2");
252
253         verifyRaftPeersPresent(member1.configDataStore(), "cars", "member-2", "member-3");
254
255         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
256
257         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
258
259         doMakeShardLeaderLocal(member1, "cars", "member-1");
260         replicaNode2.kit().waitUntilLeader(replicaNode2.configDataStore().getActorContext(), "cars");
261         replicaNode3.kit().waitUntilLeader(replicaNode3.configDataStore().getActorContext(), "cars");
262
263         doMakeShardLeaderLocal(replicaNode2, "cars", "member-2");
264         member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), "cars");
265         replicaNode3.kit().waitUntilLeader(replicaNode3.configDataStore().getActorContext(), "cars");
266
267         doMakeShardLeaderLocal(replicaNode3, "cars", "member-3");
268     }
269
270     @Test
271     public void testAddShardReplica() throws Exception {
272         String name = "testAddShardReplica";
273         String moduleShardsConfig = "module-shards-cars-member-1.conf";
274         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
275                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
276
277         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
278                 .moduleShardsConfig(moduleShardsConfig).build();
279
280         leaderNode1.waitForMembersUp("member-2");
281
282         doAddShardReplica(newReplicaNode2, "cars", "member-1");
283
284         MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
285                 .moduleShardsConfig(moduleShardsConfig).build();
286
287         leaderNode1.waitForMembersUp("member-3");
288         newReplicaNode2.waitForMembersUp("member-3");
289
290         doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
291
292         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
293         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
294
295         // Write data to member-2's config datastore and read/verify via member-3
296         final NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
297                 newReplicaNode3.configDataStore());
298
299         // Write data to member-3's oper datastore and read/verify via member-2
300         writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
301
302         // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 -
303         // 2 ServerConfigurationPayload entries and the transaction payload entry.
304
305         RaftStateVerifier verifier = raftState -> {
306             assertEquals("Commit index", 2, raftState.getCommitIndex());
307             assertEquals("Last applied index", 2, raftState.getLastApplied());
308         };
309
310         verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
311         verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
312
313         verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
314         verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
315
316         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
317         verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
318
319         // Restart member-3 and verify the cars config shard is re-instated.
320
321         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
322         newReplicaNode3.cleanup();
323
324         newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
325                 .moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
326
327         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
328         readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
329     }
330
331     @Test
332     public void testAddShardReplicaFailures() throws Exception {
333         String name = "testAddShardReplicaFailures";
334         MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
335                 .moduleShardsConfig("module-shards-cars-member-1.conf").build();
336
337         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
338                 memberNode.operDataStore(), null);
339
340         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
341                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
342         verifyFailedRpcResult(rpcResult);
343
344         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars")
345                 .build()).get(10, TimeUnit.SECONDS);
346         verifyFailedRpcResult(rpcResult);
347
348         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people")
349                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
350         verifyFailedRpcResult(rpcResult);
351     }
352
353     private static NormalizedNode<?, ?> writeCarsNodeAndVerify(AbstractDataStore writeToStore,
354             AbstractDataStore readFromStore) throws Exception {
355         DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
356         NormalizedNode<?, ?> carsNode = CarsModel.create();
357         writeTx.write(CarsModel.BASE_PATH, carsNode);
358
359         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
360         Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
361         assertEquals("canCommit", TRUE, canCommit);
362         cohort.preCommit().get(5, TimeUnit.SECONDS);
363         cohort.commit().get(5, TimeUnit.SECONDS);
364
365         readCarsNodeAndVerify(readFromStore, carsNode);
366         return carsNode;
367     }
368
369     private static void readCarsNodeAndVerify(AbstractDataStore readFromStore,
370             NormalizedNode<?, ?> expCarsNode) throws Exception {
371         Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction()
372                 .read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
373         assertTrue("isPresent", optional.isPresent());
374         assertEquals("Data node", expCarsNode, optional.get());
375     }
376
377     private void addPrefixShardReplica(final MemberNode memberNode,
378                                        final InstanceIdentifier<?> identifier,
379                                        final BindingNormalizedNodeSerializer serializer,
380                                        final String shardName,
381                                        final String... peerMemberNames) throws Exception {
382
383         final AddPrefixShardReplicaInput input = new AddPrefixShardReplicaInputBuilder()
384                 .setShardPrefix(identifier)
385                 .setDataStoreType(DataStoreType.Config).build();
386
387         final ClusterAdminRpcService service =
388                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
389
390         final RpcResult<Void> rpcResult = service.addPrefixShardReplica(input).get(10, TimeUnit.SECONDS);
391         verifySuccessfulRpcResult(rpcResult);
392
393         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
394         Optional<ActorRef> optional = memberNode.configDataStore().getActorContext().findLocalShard(shardName);
395         assertTrue("Replica shard not present", optional.isPresent());
396     }
397
398     private void removePrefixShardReplica(final MemberNode memberNode,
399                                           final InstanceIdentifier<?> identifier,
400                                           final String removeFromMember,
401                                           final BindingNormalizedNodeSerializer serializer,
402                                           final String shardName,
403                                           final String... peerMemberNames) throws Exception {
404         final RemovePrefixShardReplicaInput input = new RemovePrefixShardReplicaInputBuilder()
405                 .setDataStoreType(DataStoreType.Config)
406                 .setShardPrefix(identifier)
407                 .setMemberName(removeFromMember).build();
408
409         final ClusterAdminRpcService service =
410                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
411
412         final RpcResult<Void> rpcResult = service.removePrefixShardReplica(input).get(10, TimeUnit.SECONDS);
413         verifySuccessfulRpcResult(rpcResult);
414
415         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
416     }
417
418     private static void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames)
419             throws Exception {
420         memberNode.waitForMembersUp(peerMemberNames);
421
422         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
423                 memberNode.operDataStore(), null);
424
425         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
426                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
427         verifySuccessfulRpcResult(rpcResult);
428
429         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
430
431         Optional<ActorRef> optional = memberNode.operDataStore().getActorContext().findLocalShard(shardName);
432         assertFalse("Oper shard present", optional.isPresent());
433
434         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
435                 .setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
436         verifySuccessfulRpcResult(rpcResult);
437
438         verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
439     }
440
441     private static void doMakeShardLeaderLocal(final MemberNode memberNode, String shardName, String newLeader)
442             throws Exception {
443         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
444                 memberNode.operDataStore(), null);
445
446         final RpcResult<Void> rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder()
447                 .setDataStoreType(DataStoreType.Config).setShardName(shardName).build())
448                 .get(10, TimeUnit.SECONDS);
449
450         verifySuccessfulRpcResult(rpcResult);
451
452         verifyRaftState(memberNode.configDataStore(), shardName, raftState -> assertThat(raftState.getLeader(),
453                 containsString(newLeader)));
454
455     }
456
457     private static <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
458         if (!rpcResult.isSuccessful()) {
459             if (rpcResult.getErrors().size() > 0) {
460                 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
461                 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
462             }
463
464             fail("Rpc failed with no error");
465         }
466
467         return rpcResult.getResult();
468     }
469
470     private static void verifyFailedRpcResult(RpcResult<Void> rpcResult) {
471         assertFalse("RpcResult", rpcResult.isSuccessful());
472         assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
473         RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
474         assertNotNull("RpcResult error message null", error.getMessage());
475     }
476
477     @Test
478     public void testRemoveShardReplica() throws Exception {
479         String name = "testRemoveShardReplica";
480         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
481         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
482                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
483                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
484                 .build();
485
486         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
487                 .moduleShardsConfig(moduleShardsConfig).build();
488
489         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
490                 .moduleShardsConfig(moduleShardsConfig).build();
491
492         leaderNode1.configDataStore().waitTillReady();
493         replicaNode3.configDataStore().waitTillReady();
494         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
495         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
496         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
497
498         // Invoke RPC service on member-3 to remove it's local shard
499
500         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
501                 replicaNode3.operDataStore(), null);
502
503         RpcResult<Void> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder()
504                 .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build())
505                 .get(10, TimeUnit.SECONDS);
506         verifySuccessfulRpcResult(rpcResult);
507
508         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
509         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
510         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
511
512         // Restart member-2 and verify member-3 isn't present.
513
514         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
515         replicaNode2.cleanup();
516
517         MemberNode newPeplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
518                 .moduleShardsConfig(moduleShardsConfig).build();
519
520         newPeplicaNode2.configDataStore().waitTillReady();
521         verifyRaftPeersPresent(newPeplicaNode2.configDataStore(), "cars", "member-1");
522
523         // Invoke RPC service on member-1 to remove member-2
524
525         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
526                 leaderNode1.operDataStore(), null);
527
528         rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars")
529                 .setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
530         verifySuccessfulRpcResult(rpcResult);
531
532         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
533         verifyNoShardPresent(newPeplicaNode2.configDataStore(), "cars");
534     }
535
536     @Test
537     public void testRemoveShardLeaderReplica() throws Exception {
538         String name = "testRemoveShardLeaderReplica";
539         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
540         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
541                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
542                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
543                 .build();
544
545         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
546                 .moduleShardsConfig(moduleShardsConfig).build();
547
548         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
549                 .moduleShardsConfig(moduleShardsConfig).build();
550
551         leaderNode1.configDataStore().waitTillReady();
552         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
553         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
554         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
555
556         replicaNode2.waitForMembersUp("member-1", "member-3");
557         replicaNode3.waitForMembersUp("member-1", "member-2");
558
559         // Invoke RPC service on leader member-1 to remove it's local shard
560
561         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
562                 leaderNode1.operDataStore(), null);
563
564         RpcResult<Void> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
565                 .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build())
566                 .get(10, TimeUnit.SECONDS);
567         verifySuccessfulRpcResult(rpcResult);
568
569         verifyRaftState(replicaNode2.configDataStore(), "cars", raftState ->
570                 assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
571                         containsString("member-3"))));
572
573         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
574         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
575         verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
576     }
577
578     @Test
579     public void testAddReplicasForAllShards() throws Exception {
580         String name = "testAddReplicasForAllShards";
581         String moduleShardsConfig = "module-shards-member1.conf";
582         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
583                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
584
585         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
586                 "pets", null, Arrays.asList(MEMBER_1));
587         leaderNode1.configDataStore().getActorContext().getShardManager().tell(
588                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
589         leaderNode1.kit().expectMsgClass(Success.class);
590         leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorContext(), "pets");
591
592         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
593                 .moduleShardsConfig(moduleShardsConfig).build();
594
595         leaderNode1.waitForMembersUp("member-2");
596         newReplicaNode2.waitForMembersUp("member-1");
597
598         newReplicaNode2.configDataStore().getActorContext().getShardManager().tell(
599                 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
600         newReplicaNode2.kit().expectMsgClass(Success.class);
601
602         newReplicaNode2.operDataStore().getActorContext().getShardManager().tell(
603                 new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
604                         "no-leader", null, Arrays.asList(MEMBER_1)), Shard.builder(), null),
605                                 newReplicaNode2.kit().getRef());
606         newReplicaNode2.kit().expectMsgClass(Success.class);
607
608         ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
609                 newReplicaNode2.operDataStore(), null);
610
611         RpcResult<AddReplicasForAllShardsOutput> rpcResult =
612                 service.addReplicasForAllShards().get(10, TimeUnit.SECONDS);
613         AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
614         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
615                 successShardResult("people", DataStoreType.Config),
616                 successShardResult("pets", DataStoreType.Config),
617                 successShardResult("cars", DataStoreType.Operational),
618                 successShardResult("people", DataStoreType.Operational),
619                 failedShardResult("no-leader", DataStoreType.Operational));
620
621         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
622         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
623         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
624         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
625         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
626     }
627
628     @Test
629     public void testRemoveAllShardReplicas() throws Exception {
630         String name = "testRemoveAllShardReplicas";
631         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
632         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
633                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
634                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
635                 .build();
636
637         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
638                 .moduleShardsConfig(moduleShardsConfig).build();
639
640         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
641                 .moduleShardsConfig(moduleShardsConfig).build();
642
643         leaderNode1.configDataStore().waitTillReady();
644         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
645         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
646         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
647
648         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
649                 "pets", null, Arrays.asList(MEMBER_1, MEMBER_2, MEMBER_3));
650         leaderNode1.configDataStore().getActorContext().getShardManager().tell(
651                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
652         leaderNode1.kit().expectMsgClass(Success.class);
653
654         replicaNode2.configDataStore().getActorContext().getShardManager().tell(
655                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef());
656         replicaNode2.kit().expectMsgClass(Success.class);
657
658         replicaNode3.configDataStore().getActorContext().getShardManager().tell(
659                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef());
660         replicaNode3.kit().expectMsgClass(Success.class);
661
662         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3");
663         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
664         verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
665
666         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
667                 replicaNode3.operDataStore(), null);
668
669         RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
670                 new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
671         RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult);
672         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
673                 successShardResult("people", DataStoreType.Config),
674                 successShardResult("pets", DataStoreType.Config),
675                 successShardResult("cars", DataStoreType.Operational),
676                 successShardResult("people", DataStoreType.Operational));
677
678         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
679         verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2");
680         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2");
681         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
682         verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1");
683         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1");
684         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
685         verifyNoShardPresent(replicaNode3.configDataStore(), "people");
686         verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
687     }
688
689     @Test
690     public void testChangeMemberVotingStatesForShard() throws Exception {
691         String name = "testChangeMemberVotingStatusForShard";
692         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
693         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
694                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
695                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
696                 .build();
697
698         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
699                 .moduleShardsConfig(moduleShardsConfig).build();
700
701         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
702                 .moduleShardsConfig(moduleShardsConfig).build();
703
704         leaderNode1.configDataStore().waitTillReady();
705         replicaNode3.configDataStore().waitTillReady();
706         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
707         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
708         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
709
710         // Invoke RPC service on member-3 to change voting status
711
712         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
713                 replicaNode3.operDataStore(), null);
714
715         RpcResult<Void> rpcResult = service3
716                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
717                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
718                         .setMemberVotingState(ImmutableList.of(
719                                 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
720                                 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build()))
721                         .build())
722                 .get(10, TimeUnit.SECONDS);
723         verifySuccessfulRpcResult(rpcResult);
724
725         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
726                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
727         verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
728                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
729         verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
730                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
731     }
732
733     @Test
734     public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
735         String name = "testChangeMemberVotingStatesForSingleNodeShard";
736         String moduleShardsConfig = "module-shards-member1.conf";
737         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
738                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
739                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
740                 .build();
741
742         leaderNode.configDataStore().waitTillReady();
743
744         // Invoke RPC service on member-3 to change voting status
745
746         ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
747                 leaderNode.operDataStore(), null);
748
749         RpcResult<Void> rpcResult = service
750                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
751                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
752                         .setMemberVotingState(ImmutableList
753                                 .of(new MemberVotingStateBuilder().setMemberName("member-1").setVoting(FALSE).build()))
754                         .build())
755                 .get(10, TimeUnit.SECONDS);
756         verifyFailedRpcResult(rpcResult);
757
758         verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE));
759     }
760
761     @Test
762     public void testChangeMemberVotingStatesForAllShards() throws Exception {
763         String name = "testChangeMemberVotingStatesForAllShards";
764         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
765         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
766                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
767                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
768                 .build();
769
770         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
771                 .moduleShardsConfig(moduleShardsConfig).build();
772
773         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
774                 .moduleShardsConfig(moduleShardsConfig).build();
775
776         leaderNode1.configDataStore().waitTillReady();
777         leaderNode1.operDataStore().waitTillReady();
778         replicaNode3.configDataStore().waitTillReady();
779         replicaNode3.operDataStore().waitTillReady();
780         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
781         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
782         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
783
784         // Invoke RPC service on member-3 to change voting status
785
786         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
787                 replicaNode3.operDataStore(), null);
788
789         RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
790                 new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
791                         new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
792                         new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build())).build())
793                 .get(10, TimeUnit.SECONDS);
794         ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
795         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
796                 successShardResult("people", DataStoreType.Config),
797                 successShardResult("cars", DataStoreType.Operational),
798                 successShardResult("people", DataStoreType.Operational));
799
800         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
801                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
802                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
803                 new String[]{"cars", "people"}, new SimpleEntry<>("member-1", TRUE),
804                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
805     }
806
807     @Test
808     public void testFlipMemberVotingStates() throws Exception {
809         String name = "testFlipMemberVotingStates";
810
811         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
812                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
813                 new ServerInfo("member-3", false)));
814
815         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
816         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
817         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
818
819         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
820         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
821                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
822                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
823                 .build();
824
825         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
826                 .moduleShardsConfig(moduleShardsConfig).build();
827
828         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
829                 .moduleShardsConfig(moduleShardsConfig).build();
830
831         leaderNode1.configDataStore().waitTillReady();
832         leaderNode1.operDataStore().waitTillReady();
833         replicaNode3.configDataStore().waitTillReady();
834         replicaNode3.operDataStore().waitTillReady();
835         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
836                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE));
837
838         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
839                 replicaNode3.operDataStore(), null);
840
841         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards()
842                 .get(10, TimeUnit.SECONDS);
843         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
844         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
845                 successShardResult("people", DataStoreType.Config),
846                 successShardResult("cars", DataStoreType.Operational),
847                 successShardResult("people", DataStoreType.Operational));
848
849         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
850                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
851                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
852                 new String[]{"cars", "people"},
853                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
854                 new SimpleEntry<>("member-3", TRUE));
855
856         // Leadership should have transferred to member 3 since it is the only remaining voting member.
857         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
858             assertNotNull("Expected non-null leader Id", raftState.getLeader());
859             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
860                     raftState.getLeader().contains("member-3"));
861         });
862
863         verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
864             assertNotNull("Expected non-null leader Id", raftState.getLeader());
865             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
866                     raftState.getLeader().contains("member-3"));
867         });
868
869         // Flip the voting states back to the original states.
870
871         rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS);
872         result = verifySuccessfulRpcResult(rpcResult);
873         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
874                 successShardResult("people", DataStoreType.Config),
875                 successShardResult("cars", DataStoreType.Operational),
876                 successShardResult("people", DataStoreType.Operational));
877
878         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
879                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
880                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
881                 new String[]{"cars", "people"},
882                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
883                 new SimpleEntry<>("member-3", FALSE));
884
885         // Leadership should have transferred to member 1 or 2.
886         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
887             assertNotNull("Expected non-null leader Id", raftState.getLeader());
888             assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
889                     raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
890         });
891     }
892
893     @Test
894     public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
895         String name = "testFlipMemberVotingStatesWithNoInitialLeader";
896
897         // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
898         // non-voting and simulated as down by not starting them up.
899         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
900                 new ServerInfo("member-1", false), new ServerInfo("member-2", false),
901                 new ServerInfo("member-3", false), new ServerInfo("member-4", true),
902                 new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
903
904         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
905         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
906         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
907
908         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
909         final MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
910                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
911                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
912                 .build();
913
914         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
915                 .moduleShardsConfig(moduleShardsConfig).build();
916
917         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
918                 .moduleShardsConfig(moduleShardsConfig).build();
919
920         // Initially there won't be a leader b/c all the up nodes are non-voting.
921
922         replicaNode1.waitForMembersUp("member-2", "member-3");
923
924         verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", FALSE),
925                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE),
926                 new SimpleEntry<>("member-4", TRUE), new SimpleEntry<>("member-5", TRUE),
927                 new SimpleEntry<>("member-6", TRUE));
928
929         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
930             assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
931
932         ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
933                 replicaNode1.operDataStore(), null);
934
935         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
936                 .get(10, TimeUnit.SECONDS);
937         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
938         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
939                 successShardResult("people", DataStoreType.Config),
940                 successShardResult("cars", DataStoreType.Operational),
941                 successShardResult("people", DataStoreType.Operational));
942
943         verifyVotingStates(new AbstractDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
944                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
945                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
946                 new String[]{"cars", "people"},
947                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
948                 new SimpleEntry<>("member-3", TRUE), new SimpleEntry<>("member-4", FALSE),
949                 new SimpleEntry<>("member-5", FALSE), new SimpleEntry<>("member-6", FALSE));
950
951         // Since member 1 was changed to voting and there was no leader, it should've started and election
952         // and become leader
953         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
954             assertNotNull("Expected non-null leader Id", raftState.getLeader());
955             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
956                     raftState.getLeader().contains("member-1"));
957         });
958
959         verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
960             assertNotNull("Expected non-null leader Id", raftState.getLeader());
961             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
962                     raftState.getLeader().contains("member-1"));
963         });
964     }
965
966     @Test
967     public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
968         String name = "testFlipMemberVotingStatesWithVotingMembersDown";
969
970         // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
971         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
972                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
973                 new ServerInfo("member-3", true), new ServerInfo("member-4", false),
974                 new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
975
976         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
977         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
978         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
979
980         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
981         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
982                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
983                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
984                 .build();
985
986         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
987                 .moduleShardsConfig(moduleShardsConfig).build();
988
989         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
990                 .moduleShardsConfig(moduleShardsConfig).build();
991
992         leaderNode1.configDataStore().waitTillReady();
993         leaderNode1.operDataStore().waitTillReady();
994         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
995                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", TRUE),
996                 new SimpleEntry<>("member-4", FALSE), new SimpleEntry<>("member-5", FALSE),
997                 new SimpleEntry<>("member-6", FALSE));
998
999         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
1000                 leaderNode1.operDataStore(), null);
1001
1002         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
1003                 .get(10, TimeUnit.SECONDS);
1004         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
1005         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
1006                 successShardResult("people", DataStoreType.Config),
1007                 successShardResult("cars", DataStoreType.Operational),
1008                 successShardResult("people", DataStoreType.Operational));
1009
1010         // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
1011         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
1012                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
1013                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
1014                 new String[]{"cars", "people"},
1015                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
1016                 new SimpleEntry<>("member-3", FALSE), new SimpleEntry<>("member-4", TRUE),
1017                 new SimpleEntry<>("member-5", TRUE), new SimpleEntry<>("member-6", TRUE));
1018
1019         // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
1020         // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
1021         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
1022             assertNotNull("Expected non-null leader Id", raftState.getLeader());
1023             assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
1024         });
1025     }
1026
1027     private static void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
1028             String member, String datastoreTypeSuffix, String... shards) {
1029         String[] datastoreTypes = {"config_", "oper_"};
1030         for (String type : datastoreTypes) {
1031             for (String shard : shards) {
1032                 List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
1033                 for (ServerInfo info : serverConfig.getServerConfig()) {
1034                     newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
1035                             type + datastoreTypeSuffix).toString(), info.isVoting()));
1036                 }
1037
1038                 String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
1039                         type + datastoreTypeSuffix).toString();
1040                 InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
1041                 InMemoryJournal.addEntry(shardID, 2, new ReplicatedLogImplEntry(0, 1,
1042                         new ServerConfigurationPayload(newServerInfo)));
1043             }
1044         }
1045     }
1046
1047     @SafeVarargs
1048     private static void verifyVotingStates(AbstractDataStore[] datastores, String[] shards,
1049             SimpleEntry<String, Boolean>... expStates) throws Exception {
1050         for (AbstractDataStore datastore: datastores) {
1051             for (String shard: shards) {
1052                 verifyVotingStates(datastore, shard, expStates);
1053             }
1054         }
1055     }
1056
1057     @SafeVarargs
1058     private static void verifyVotingStates(AbstractDataStore datastore, String shardName,
1059             SimpleEntry<String, Boolean>... expStates) throws Exception {
1060         String localMemberName = datastore.getActorContext().getCurrentMemberName().getName();
1061         Map<String, Boolean> expStateMap = new HashMap<>();
1062         for (Entry<String, Boolean> e: expStates) {
1063             expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
1064                     datastore.getActorContext().getDataStoreName()).toString(), e.getValue());
1065         }
1066
1067         verifyRaftState(datastore, shardName, raftState -> {
1068             String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
1069                     datastore.getActorContext().getDataStoreName()).toString();
1070             assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
1071             for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1072                 assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
1073             }
1074         });
1075     }
1076
1077     private static void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {
1078         Map<String, ShardResult> expResultsMap = new HashMap<>();
1079         for (ShardResult r: expShardResults) {
1080             expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
1081         }
1082
1083         for (ShardResult result: shardResults) {
1084             ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
1085             assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
1086                     result.getDataStoreType()), exp);
1087             assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded());
1088             if (exp.isSucceeded()) {
1089                 assertNull("Expected null error message", result.getErrorMessage());
1090             } else {
1091                 assertNotNull("Expected error message", result.getErrorMessage());
1092             }
1093         }
1094
1095         if (!expResultsMap.isEmpty()) {
1096             fail("Missing shard results for " + expResultsMap.keySet());
1097         }
1098     }
1099
1100     private static ShardResult successShardResult(String shardName, DataStoreType type) {
1101         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(TRUE).build();
1102     }
1103
1104     private static ShardResult failedShardResult(String shardName, DataStoreType type) {
1105         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(FALSE).build();
1106     }
1107 }