Remove deprecated persisted raft payloads
[controller.git] / opendaylight / md-sal / sal-cluster-admin-impl / src / test / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcServiceTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.admin;
9
10 import static java.lang.Boolean.FALSE;
11 import static java.lang.Boolean.TRUE;
12 import static org.hamcrest.CoreMatchers.anyOf;
13 import static org.hamcrest.CoreMatchers.containsString;
14 import static org.junit.Assert.assertEquals;
15 import static org.junit.Assert.assertFalse;
16 import static org.junit.Assert.assertNotNull;
17 import static org.junit.Assert.assertNull;
18 import static org.junit.Assert.assertThat;
19 import static org.junit.Assert.assertTrue;
20 import static org.junit.Assert.fail;
21 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
22 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
23 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
24
25 import akka.actor.ActorRef;
26 import akka.actor.PoisonPill;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import com.google.common.base.Optional;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.collect.Iterables;
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Sets;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.net.URI;
38 import java.util.AbstractMap.SimpleEntry;
39 import java.util.ArrayList;
40 import java.util.Arrays;
41 import java.util.Collections;
42 import java.util.HashMap;
43 import java.util.HashSet;
44 import java.util.List;
45 import java.util.Map;
46 import java.util.Map.Entry;
47 import java.util.Set;
48 import java.util.concurrent.TimeUnit;
49 import org.apache.commons.lang3.SerializationUtils;
50 import org.junit.After;
51 import org.junit.Before;
52 import org.junit.Test;
53 import org.mockito.Mockito;
54 import org.opendaylight.controller.cluster.access.concepts.MemberName;
55 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
56 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
57 import org.opendaylight.controller.cluster.datastore.MemberNode;
58 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
59 import org.opendaylight.controller.cluster.datastore.Shard;
60 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
61 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
62 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
64 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
65 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
66 import org.opendaylight.controller.cluster.raft.RaftState;
67 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
68 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
69 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
70 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
71 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
74 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
75 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
76 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
77 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
78 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
79 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInputBuilder;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
100 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
101 import org.opendaylight.yangtools.yang.common.RpcError;
102 import org.opendaylight.yangtools.yang.common.RpcResult;
103 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
104
105 /**
106  * Unit tests for ClusterAdminRpcService.
107  *
108  * @author Thomas Pantelis
109  */
110 public class ClusterAdminRpcServiceTest {
111     private static final MemberName MEMBER_1 = MemberName.forName("member-1");
112     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
113     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
114     private final List<MemberNode> memberNodes = new ArrayList<>();
115
116     @Before
117     public void setUp() {
118         InMemoryJournal.clear();
119         InMemorySnapshotStore.clear();
120     }
121
122     @After
123     public void tearDown() {
124         for (MemberNode m : Lists.reverse(memberNodes)) {
125             m.cleanup();
126         }
127         memberNodes.clear();
128     }
129
130     @Test
131     public void testBackupDatastore() throws Exception {
132         MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1")
133                 .moduleShardsConfig("module-shards-member1.conf").waitForShardLeader("cars", "people")
134                 .testName("testBackupDatastore").build();
135
136         String fileName = "target/testBackupDatastore";
137         new File(fileName).delete();
138
139         ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null);
140
141         RpcResult<Void> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder()
142                 .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
143         verifySuccessfulRpcResult(rpcResult);
144
145         try (FileInputStream fis = new FileInputStream(fileName)) {
146             List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
147             assertEquals("DatastoreSnapshot size", 2, snapshots.size());
148
149             ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
150                     snapshots.get(1).getType(), snapshots.get(1));
151             verifyDatastoreSnapshot(node.configDataStore().getActorContext().getDataStoreName(),
152                     map.get(node.configDataStore().getActorContext().getDataStoreName()), "cars", "people");
153         } finally {
154             new File(fileName).delete();
155         }
156
157         // Test failure by killing a shard.
158
159         node.configDataStore().getActorContext().getShardManager().tell(node.datastoreContextBuilder()
160                 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
161
162         ActorRef carsShardActor = node.configDataStore().getActorContext().findLocalShard("cars").get();
163         node.kit().watch(carsShardActor);
164         carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
165         node.kit().expectTerminated(carsShardActor);
166
167         rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build())
168                 .get(5, TimeUnit.SECONDS);
169         assertFalse("isSuccessful", rpcResult.isSuccessful());
170         assertEquals("getErrors", 1, rpcResult.getErrors().size());
171     }
172
173     private static void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot,
174             String... expShardNames) {
175         assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
176         Set<String> shardNames = new HashSet<>();
177         for (DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
178             shardNames.add(s.getName());
179         }
180
181         assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
182     }
183
184     @Test
185     public void testAddRemovePrefixShardReplica() throws Exception {
186         String name = "testAddPrefixShardReplica";
187         String moduleShardsConfig = "module-shards-default.conf";
188
189         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
190                 .moduleShardsConfig(moduleShardsConfig).build();
191         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
192                 .moduleShardsConfig(moduleShardsConfig).build();
193         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
194                 .moduleShardsConfig(moduleShardsConfig).build();
195
196         member1.waitForMembersUp("member-2", "member-3");
197         replicaNode2.kit().waitForMembersUp("member-1", "member-3");
198         replicaNode3.kit().waitForMembersUp("member-1", "member-2");
199
200         final ActorRef shardManager1 = member1.configDataStore().getActorContext().getShardManager();
201
202         shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
203                         new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
204                         "prefix", Collections.singleton(MEMBER_1))),
205                 ActorRef.noSender());
206
207         member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(),
208                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
209
210         final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
211         final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
212         Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
213
214         addPrefixShardReplica(replicaNode2, identifier, serializer,
215                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1");
216
217         addPrefixShardReplica(replicaNode3, identifier, serializer,
218                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1", "member-2");
219
220         verifyRaftPeersPresent(member1.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
221                 "member-2", "member-3");
222
223         removePrefixShardReplica(member1, identifier, "member-3", serializer,
224                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-2");
225
226         verifyNoShardPresent(replicaNode3.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
227         verifyRaftPeersPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
228                 "member-1");
229
230         removePrefixShardReplica(member1, identifier, "member-2", serializer,
231                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
232
233         verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
234     }
235
236     @Test
237     public void testModuleShardLeaderMovement() throws Exception {
238         String name = "testModuleShardLeaderMovement";
239         String moduleShardsConfig = "module-shards-member1.conf";
240
241         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
242                 .waitForShardLeader("cars").moduleShardsConfig(moduleShardsConfig).build();
243         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
244                 .moduleShardsConfig(moduleShardsConfig).build();
245         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
246                 .moduleShardsConfig(moduleShardsConfig).build();
247
248         member1.waitForMembersUp("member-2", "member-3");
249         replicaNode2.waitForMembersUp("member-1");
250         replicaNode3.waitForMembersUp("member-1", "member-2");
251
252         doAddShardReplica(replicaNode2, "cars", "member-1");
253         doAddShardReplica(replicaNode3, "cars", "member-1", "member-2");
254
255         verifyRaftPeersPresent(member1.configDataStore(), "cars", "member-2", "member-3");
256
257         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
258
259         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
260
261         doMakeShardLeaderLocal(member1, "cars", "member-1");
262         verifyRaftState(replicaNode2.configDataStore(), "cars",
263             raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
264         verifyRaftState(replicaNode3.configDataStore(), "cars",
265             raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
266
267         doMakeShardLeaderLocal(replicaNode2, "cars", "member-2");
268         verifyRaftState(member1.configDataStore(), "cars",
269             raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
270         verifyRaftState(replicaNode3.configDataStore(), "cars",
271             raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
272
273         replicaNode2.waitForMembersUp("member-3");
274         doMakeShardLeaderLocal(replicaNode3, "cars", "member-3");
275     }
276
277     @Test
278     public void testAddShardReplica() throws Exception {
279         String name = "testAddShardReplica";
280         String moduleShardsConfig = "module-shards-cars-member-1.conf";
281         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
282                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
283
284         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
285                 .moduleShardsConfig(moduleShardsConfig).build();
286
287         leaderNode1.waitForMembersUp("member-2");
288
289         doAddShardReplica(newReplicaNode2, "cars", "member-1");
290
291         MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
292                 .moduleShardsConfig(moduleShardsConfig).build();
293
294         leaderNode1.waitForMembersUp("member-3");
295         newReplicaNode2.waitForMembersUp("member-3");
296
297         doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
298
299         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
300         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
301
302         // Write data to member-2's config datastore and read/verify via member-3
303         final NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
304                 newReplicaNode3.configDataStore());
305
306         // Write data to member-3's oper datastore and read/verify via member-2
307         writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
308
309         // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 -
310         // 2 ServerConfigurationPayload entries and the transaction payload entry.
311
312         RaftStateVerifier verifier = raftState -> {
313             assertEquals("Commit index", 2, raftState.getCommitIndex());
314             assertEquals("Last applied index", 2, raftState.getLastApplied());
315         };
316
317         verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
318         verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
319
320         verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
321         verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
322
323         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
324         verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
325
326         // Restart member-3 and verify the cars config shard is re-instated.
327
328         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
329         newReplicaNode3.cleanup();
330
331         newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
332                 .moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
333
334         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
335         readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
336     }
337
338     @Test
339     public void testAddShardReplicaFailures() throws Exception {
340         String name = "testAddShardReplicaFailures";
341         MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
342                 .moduleShardsConfig("module-shards-cars-member-1.conf").build();
343
344         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
345                 memberNode.operDataStore(), null);
346
347         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
348                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
349         verifyFailedRpcResult(rpcResult);
350
351         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars")
352                 .build()).get(10, TimeUnit.SECONDS);
353         verifyFailedRpcResult(rpcResult);
354
355         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people")
356                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
357         verifyFailedRpcResult(rpcResult);
358     }
359
360     private static NormalizedNode<?, ?> writeCarsNodeAndVerify(AbstractDataStore writeToStore,
361             AbstractDataStore readFromStore) throws Exception {
362         DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
363         NormalizedNode<?, ?> carsNode = CarsModel.create();
364         writeTx.write(CarsModel.BASE_PATH, carsNode);
365
366         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
367         Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
368         assertEquals("canCommit", TRUE, canCommit);
369         cohort.preCommit().get(5, TimeUnit.SECONDS);
370         cohort.commit().get(5, TimeUnit.SECONDS);
371
372         readCarsNodeAndVerify(readFromStore, carsNode);
373         return carsNode;
374     }
375
376     private static void readCarsNodeAndVerify(AbstractDataStore readFromStore,
377             NormalizedNode<?, ?> expCarsNode) throws Exception {
378         Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction()
379                 .read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
380         assertTrue("isPresent", optional.isPresent());
381         assertEquals("Data node", expCarsNode, optional.get());
382     }
383
384     private void addPrefixShardReplica(final MemberNode memberNode,
385                                        final InstanceIdentifier<?> identifier,
386                                        final BindingNormalizedNodeSerializer serializer,
387                                        final String shardName,
388                                        final String... peerMemberNames) throws Exception {
389
390         final AddPrefixShardReplicaInput input = new AddPrefixShardReplicaInputBuilder()
391                 .setShardPrefix(identifier)
392                 .setDataStoreType(DataStoreType.Config).build();
393
394         final ClusterAdminRpcService service =
395                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
396
397         final RpcResult<Void> rpcResult = service.addPrefixShardReplica(input).get(10, TimeUnit.SECONDS);
398         verifySuccessfulRpcResult(rpcResult);
399
400         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
401         Optional<ActorRef> optional = memberNode.configDataStore().getActorContext().findLocalShard(shardName);
402         assertTrue("Replica shard not present", optional.isPresent());
403     }
404
405     private void removePrefixShardReplica(final MemberNode memberNode,
406                                           final InstanceIdentifier<?> identifier,
407                                           final String removeFromMember,
408                                           final BindingNormalizedNodeSerializer serializer,
409                                           final String shardName,
410                                           final String... peerMemberNames) throws Exception {
411         final RemovePrefixShardReplicaInput input = new RemovePrefixShardReplicaInputBuilder()
412                 .setDataStoreType(DataStoreType.Config)
413                 .setShardPrefix(identifier)
414                 .setMemberName(removeFromMember).build();
415
416         final ClusterAdminRpcService service =
417                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
418
419         final RpcResult<Void> rpcResult = service.removePrefixShardReplica(input).get(10, TimeUnit.SECONDS);
420         verifySuccessfulRpcResult(rpcResult);
421
422         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
423     }
424
425     private static void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames)
426             throws Exception {
427         memberNode.waitForMembersUp(peerMemberNames);
428
429         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
430                 memberNode.operDataStore(), null);
431
432         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
433                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
434         verifySuccessfulRpcResult(rpcResult);
435
436         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
437
438         Optional<ActorRef> optional = memberNode.operDataStore().getActorContext().findLocalShard(shardName);
439         assertFalse("Oper shard present", optional.isPresent());
440
441         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
442                 .setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
443         verifySuccessfulRpcResult(rpcResult);
444
445         verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
446     }
447
448     private static void doMakeShardLeaderLocal(final MemberNode memberNode, String shardName, String newLeader)
449             throws Exception {
450         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
451                 memberNode.operDataStore(), null);
452
453         final RpcResult<Void> rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder()
454                 .setDataStoreType(DataStoreType.Config).setShardName(shardName).build())
455                 .get(10, TimeUnit.SECONDS);
456
457         verifySuccessfulRpcResult(rpcResult);
458
459         verifyRaftState(memberNode.configDataStore(), shardName, raftState -> assertThat(raftState.getLeader(),
460                 containsString(newLeader)));
461
462     }
463
464     private static <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
465         if (!rpcResult.isSuccessful()) {
466             if (rpcResult.getErrors().size() > 0) {
467                 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
468                 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
469             }
470
471             fail("Rpc failed with no error");
472         }
473
474         return rpcResult.getResult();
475     }
476
477     private static void verifyFailedRpcResult(RpcResult<Void> rpcResult) {
478         assertFalse("RpcResult", rpcResult.isSuccessful());
479         assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
480         RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
481         assertNotNull("RpcResult error message null", error.getMessage());
482     }
483
484     @Test
485     public void testRemoveShardReplica() throws Exception {
486         String name = "testRemoveShardReplica";
487         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
488         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
489                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
490                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
491                 .build();
492
493         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
494                 .moduleShardsConfig(moduleShardsConfig).build();
495
496         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
497                 .moduleShardsConfig(moduleShardsConfig).build();
498
499         leaderNode1.configDataStore().waitTillReady();
500         replicaNode3.configDataStore().waitTillReady();
501         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
502         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
503         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
504
505         // Invoke RPC service on member-3 to remove it's local shard
506
507         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
508                 replicaNode3.operDataStore(), null);
509
510         RpcResult<Void> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder()
511                 .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build())
512                 .get(10, TimeUnit.SECONDS);
513         verifySuccessfulRpcResult(rpcResult);
514
515         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
516         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
517         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
518
519         // Restart member-2 and verify member-3 isn't present.
520
521         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
522         replicaNode2.cleanup();
523
524         MemberNode newPeplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
525                 .moduleShardsConfig(moduleShardsConfig).build();
526
527         newPeplicaNode2.configDataStore().waitTillReady();
528         verifyRaftPeersPresent(newPeplicaNode2.configDataStore(), "cars", "member-1");
529
530         // Invoke RPC service on member-1 to remove member-2
531
532         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
533                 leaderNode1.operDataStore(), null);
534
535         rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars")
536                 .setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
537         verifySuccessfulRpcResult(rpcResult);
538
539         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
540         verifyNoShardPresent(newPeplicaNode2.configDataStore(), "cars");
541     }
542
543     @Test
544     public void testRemoveShardLeaderReplica() throws Exception {
545         String name = "testRemoveShardLeaderReplica";
546         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
547         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
548                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
549                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
550                 .build();
551
552         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
553                 .moduleShardsConfig(moduleShardsConfig).build();
554
555         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
556                 .moduleShardsConfig(moduleShardsConfig).build();
557
558         leaderNode1.configDataStore().waitTillReady();
559         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
560         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
561         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
562
563         replicaNode2.waitForMembersUp("member-1", "member-3");
564         replicaNode3.waitForMembersUp("member-1", "member-2");
565
566         // Invoke RPC service on leader member-1 to remove it's local shard
567
568         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
569                 leaderNode1.operDataStore(), null);
570
571         RpcResult<Void> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
572                 .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build())
573                 .get(10, TimeUnit.SECONDS);
574         verifySuccessfulRpcResult(rpcResult);
575
576         verifyRaftState(replicaNode2.configDataStore(), "cars", raftState ->
577                 assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
578                         containsString("member-3"))));
579
580         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
581         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
582         verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
583     }
584
585     @Test
586     public void testAddReplicasForAllShards() throws Exception {
587         String name = "testAddReplicasForAllShards";
588         String moduleShardsConfig = "module-shards-member1.conf";
589         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
590                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
591
592         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
593                 "pets", null, Arrays.asList(MEMBER_1));
594         leaderNode1.configDataStore().getActorContext().getShardManager().tell(
595                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
596         leaderNode1.kit().expectMsgClass(Success.class);
597         leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorContext(), "pets");
598
599         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
600                 .moduleShardsConfig(moduleShardsConfig).build();
601
602         leaderNode1.waitForMembersUp("member-2");
603         newReplicaNode2.waitForMembersUp("member-1");
604
605         newReplicaNode2.configDataStore().getActorContext().getShardManager().tell(
606                 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
607         newReplicaNode2.kit().expectMsgClass(Success.class);
608
609         newReplicaNode2.operDataStore().getActorContext().getShardManager().tell(
610                 new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
611                         "no-leader", null, Arrays.asList(MEMBER_1)), Shard.builder(), null),
612                                 newReplicaNode2.kit().getRef());
613         newReplicaNode2.kit().expectMsgClass(Success.class);
614
615         ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
616                 newReplicaNode2.operDataStore(), null);
617
618         RpcResult<AddReplicasForAllShardsOutput> rpcResult =
619                 service.addReplicasForAllShards().get(10, TimeUnit.SECONDS);
620         AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
621         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
622                 successShardResult("people", DataStoreType.Config),
623                 successShardResult("pets", DataStoreType.Config),
624                 successShardResult("cars", DataStoreType.Operational),
625                 successShardResult("people", DataStoreType.Operational),
626                 failedShardResult("no-leader", DataStoreType.Operational));
627
628         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
629         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
630         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
631         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
632         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
633     }
634
635     @Test
636     public void testRemoveAllShardReplicas() throws Exception {
637         String name = "testRemoveAllShardReplicas";
638         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
639         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
640                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
641                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
642                 .build();
643
644         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
645                 .moduleShardsConfig(moduleShardsConfig).build();
646
647         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
648                 .moduleShardsConfig(moduleShardsConfig).build();
649
650         leaderNode1.configDataStore().waitTillReady();
651         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
652         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
653         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
654
655         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
656                 "pets", null, Arrays.asList(MEMBER_1, MEMBER_2, MEMBER_3));
657         leaderNode1.configDataStore().getActorContext().getShardManager().tell(
658                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
659         leaderNode1.kit().expectMsgClass(Success.class);
660
661         replicaNode2.configDataStore().getActorContext().getShardManager().tell(
662                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef());
663         replicaNode2.kit().expectMsgClass(Success.class);
664
665         replicaNode3.configDataStore().getActorContext().getShardManager().tell(
666                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef());
667         replicaNode3.kit().expectMsgClass(Success.class);
668
669         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3");
670         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
671         verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
672
673         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
674                 replicaNode3.operDataStore(), null);
675
676         RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
677                 new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
678         RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult);
679         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
680                 successShardResult("people", DataStoreType.Config),
681                 successShardResult("pets", DataStoreType.Config),
682                 successShardResult("cars", DataStoreType.Operational),
683                 successShardResult("people", DataStoreType.Operational));
684
685         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
686         verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2");
687         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2");
688         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
689         verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1");
690         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1");
691         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
692         verifyNoShardPresent(replicaNode3.configDataStore(), "people");
693         verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
694     }
695
696     @Test
697     public void testChangeMemberVotingStatesForShard() throws Exception {
698         String name = "testChangeMemberVotingStatusForShard";
699         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
700         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
701                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
702                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
703                 .build();
704
705         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
706                 .moduleShardsConfig(moduleShardsConfig).build();
707
708         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
709                 .moduleShardsConfig(moduleShardsConfig).build();
710
711         leaderNode1.configDataStore().waitTillReady();
712         replicaNode3.configDataStore().waitTillReady();
713         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
714         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
715         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
716
717         // Invoke RPC service on member-3 to change voting status
718
719         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
720                 replicaNode3.operDataStore(), null);
721
722         RpcResult<Void> rpcResult = service3
723                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
724                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
725                         .setMemberVotingState(ImmutableList.of(
726                                 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
727                                 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build()))
728                         .build())
729                 .get(10, TimeUnit.SECONDS);
730         verifySuccessfulRpcResult(rpcResult);
731
732         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
733                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
734         verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
735                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
736         verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
737                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
738     }
739
740     @Test
741     public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
742         String name = "testChangeMemberVotingStatesForSingleNodeShard";
743         String moduleShardsConfig = "module-shards-member1.conf";
744         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
745                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
746                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
747                 .build();
748
749         leaderNode.configDataStore().waitTillReady();
750
751         // Invoke RPC service on member-3 to change voting status
752
753         ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
754                 leaderNode.operDataStore(), null);
755
756         RpcResult<Void> rpcResult = service
757                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
758                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
759                         .setMemberVotingState(ImmutableList
760                                 .of(new MemberVotingStateBuilder().setMemberName("member-1").setVoting(FALSE).build()))
761                         .build())
762                 .get(10, TimeUnit.SECONDS);
763         verifyFailedRpcResult(rpcResult);
764
765         verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE));
766     }
767
768     @Test
769     public void testChangeMemberVotingStatesForAllShards() throws Exception {
770         String name = "testChangeMemberVotingStatesForAllShards";
771         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
772         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
773                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
774                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
775                 .build();
776
777         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
778                 .moduleShardsConfig(moduleShardsConfig).build();
779
780         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
781                 .moduleShardsConfig(moduleShardsConfig).build();
782
783         leaderNode1.configDataStore().waitTillReady();
784         leaderNode1.operDataStore().waitTillReady();
785         replicaNode3.configDataStore().waitTillReady();
786         replicaNode3.operDataStore().waitTillReady();
787         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
788         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
789         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
790
791         // Invoke RPC service on member-3 to change voting status
792
793         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
794                 replicaNode3.operDataStore(), null);
795
796         RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
797                 new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
798                         new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
799                         new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build())).build())
800                 .get(10, TimeUnit.SECONDS);
801         ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
802         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
803                 successShardResult("people", DataStoreType.Config),
804                 successShardResult("cars", DataStoreType.Operational),
805                 successShardResult("people", DataStoreType.Operational));
806
807         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
808                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
809                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
810                 new String[]{"cars", "people"}, new SimpleEntry<>("member-1", TRUE),
811                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
812     }
813
814     @Test
815     public void testFlipMemberVotingStates() throws Exception {
816         String name = "testFlipMemberVotingStates";
817
818         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
819                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
820                 new ServerInfo("member-3", false)));
821
822         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
823         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
824         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
825
826         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
827         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
828                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
829                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
830                 .build();
831
832         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
833                 .moduleShardsConfig(moduleShardsConfig).build();
834
835         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
836                 .moduleShardsConfig(moduleShardsConfig).build();
837
838         leaderNode1.configDataStore().waitTillReady();
839         leaderNode1.operDataStore().waitTillReady();
840         replicaNode3.configDataStore().waitTillReady();
841         replicaNode3.operDataStore().waitTillReady();
842         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
843                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE));
844
845         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
846                 replicaNode3.operDataStore(), null);
847
848         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards()
849                 .get(10, TimeUnit.SECONDS);
850         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
851         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
852                 successShardResult("people", DataStoreType.Config),
853                 successShardResult("cars", DataStoreType.Operational),
854                 successShardResult("people", DataStoreType.Operational));
855
856         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
857                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
858                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
859                 new String[]{"cars", "people"},
860                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
861                 new SimpleEntry<>("member-3", TRUE));
862
863         // Leadership should have transferred to member 3 since it is the only remaining voting member.
864         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
865             assertNotNull("Expected non-null leader Id", raftState.getLeader());
866             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
867                     raftState.getLeader().contains("member-3"));
868         });
869
870         verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
871             assertNotNull("Expected non-null leader Id", raftState.getLeader());
872             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
873                     raftState.getLeader().contains("member-3"));
874         });
875
876         // Flip the voting states back to the original states.
877
878         rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS);
879         result = verifySuccessfulRpcResult(rpcResult);
880         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
881                 successShardResult("people", DataStoreType.Config),
882                 successShardResult("cars", DataStoreType.Operational),
883                 successShardResult("people", DataStoreType.Operational));
884
885         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
886                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
887                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
888                 new String[]{"cars", "people"},
889                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
890                 new SimpleEntry<>("member-3", FALSE));
891
892         // Leadership should have transferred to member 1 or 2.
893         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
894             assertNotNull("Expected non-null leader Id", raftState.getLeader());
895             assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
896                     raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
897         });
898     }
899
900     @Test
901     public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
902         String name = "testFlipMemberVotingStatesWithNoInitialLeader";
903
904         // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
905         // non-voting and simulated as down by not starting them up.
906         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
907                 new ServerInfo("member-1", false), new ServerInfo("member-2", false),
908                 new ServerInfo("member-3", false), new ServerInfo("member-4", true),
909                 new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
910
911         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
912         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
913         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
914
915         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
916         final MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
917                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
918                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
919                 .build();
920
921         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
922                 .moduleShardsConfig(moduleShardsConfig).build();
923
924         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
925                 .moduleShardsConfig(moduleShardsConfig).build();
926
927         // Initially there won't be a leader b/c all the up nodes are non-voting.
928
929         replicaNode1.waitForMembersUp("member-2", "member-3");
930
931         verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", FALSE),
932                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE),
933                 new SimpleEntry<>("member-4", TRUE), new SimpleEntry<>("member-5", TRUE),
934                 new SimpleEntry<>("member-6", TRUE));
935
936         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
937             assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
938
939         ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
940                 replicaNode1.operDataStore(), null);
941
942         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
943                 .get(10, TimeUnit.SECONDS);
944         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
945         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
946                 successShardResult("people", DataStoreType.Config),
947                 successShardResult("cars", DataStoreType.Operational),
948                 successShardResult("people", DataStoreType.Operational));
949
950         verifyVotingStates(new AbstractDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
951                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
952                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
953                 new String[]{"cars", "people"},
954                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
955                 new SimpleEntry<>("member-3", TRUE), new SimpleEntry<>("member-4", FALSE),
956                 new SimpleEntry<>("member-5", FALSE), new SimpleEntry<>("member-6", FALSE));
957
958         // Since member 1 was changed to voting and there was no leader, it should've started and election
959         // and become leader
960         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
961             assertNotNull("Expected non-null leader Id", raftState.getLeader());
962             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
963                     raftState.getLeader().contains("member-1"));
964         });
965
966         verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
967             assertNotNull("Expected non-null leader Id", raftState.getLeader());
968             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
969                     raftState.getLeader().contains("member-1"));
970         });
971     }
972
973     @Test
974     public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
975         String name = "testFlipMemberVotingStatesWithVotingMembersDown";
976
977         // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
978         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
979                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
980                 new ServerInfo("member-3", true), new ServerInfo("member-4", false),
981                 new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
982
983         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
984         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
985         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
986
987         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
988         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
989                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
990                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
991                 .build();
992
993         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
994                 .moduleShardsConfig(moduleShardsConfig).build();
995
996         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
997                 .moduleShardsConfig(moduleShardsConfig).build();
998
999         leaderNode1.configDataStore().waitTillReady();
1000         leaderNode1.operDataStore().waitTillReady();
1001         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
1002                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", TRUE),
1003                 new SimpleEntry<>("member-4", FALSE), new SimpleEntry<>("member-5", FALSE),
1004                 new SimpleEntry<>("member-6", FALSE));
1005
1006         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
1007                 leaderNode1.operDataStore(), null);
1008
1009         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
1010                 .get(10, TimeUnit.SECONDS);
1011         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
1012         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
1013                 successShardResult("people", DataStoreType.Config),
1014                 successShardResult("cars", DataStoreType.Operational),
1015                 successShardResult("people", DataStoreType.Operational));
1016
1017         // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
1018         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
1019                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
1020                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
1021                 new String[]{"cars", "people"},
1022                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
1023                 new SimpleEntry<>("member-3", FALSE), new SimpleEntry<>("member-4", TRUE),
1024                 new SimpleEntry<>("member-5", TRUE), new SimpleEntry<>("member-6", TRUE));
1025
1026         // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
1027         // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
1028         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
1029             assertNotNull("Expected non-null leader Id", raftState.getLeader());
1030             assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
1031         });
1032     }
1033
1034     private static void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
1035             String member, String datastoreTypeSuffix, String... shards) {
1036         String[] datastoreTypes = {"config_", "oper_"};
1037         for (String type : datastoreTypes) {
1038             for (String shard : shards) {
1039                 List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
1040                 for (ServerInfo info : serverConfig.getServerConfig()) {
1041                     newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
1042                             type + datastoreTypeSuffix).toString(), info.isVoting()));
1043                 }
1044
1045                 String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
1046                         type + datastoreTypeSuffix).toString();
1047                 InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
1048                 InMemoryJournal.addEntry(shardID, 2, new SimpleReplicatedLogEntry(0, 1,
1049                         new ServerConfigurationPayload(newServerInfo)));
1050             }
1051         }
1052     }
1053
1054     @SafeVarargs
1055     private static void verifyVotingStates(AbstractDataStore[] datastores, String[] shards,
1056             SimpleEntry<String, Boolean>... expStates) throws Exception {
1057         for (AbstractDataStore datastore: datastores) {
1058             for (String shard: shards) {
1059                 verifyVotingStates(datastore, shard, expStates);
1060             }
1061         }
1062     }
1063
1064     @SafeVarargs
1065     private static void verifyVotingStates(AbstractDataStore datastore, String shardName,
1066             SimpleEntry<String, Boolean>... expStates) throws Exception {
1067         String localMemberName = datastore.getActorContext().getCurrentMemberName().getName();
1068         Map<String, Boolean> expStateMap = new HashMap<>();
1069         for (Entry<String, Boolean> e: expStates) {
1070             expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
1071                     datastore.getActorContext().getDataStoreName()).toString(), e.getValue());
1072         }
1073
1074         verifyRaftState(datastore, shardName, raftState -> {
1075             String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
1076                     datastore.getActorContext().getDataStoreName()).toString();
1077             assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
1078             for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1079                 assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
1080             }
1081         });
1082     }
1083
1084     private static void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {
1085         Map<String, ShardResult> expResultsMap = new HashMap<>();
1086         for (ShardResult r: expShardResults) {
1087             expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
1088         }
1089
1090         for (ShardResult result: shardResults) {
1091             ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
1092             assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
1093                     result.getDataStoreType()), exp);
1094             assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded());
1095             if (exp.isSucceeded()) {
1096                 assertNull("Expected null error message", result.getErrorMessage());
1097             } else {
1098                 assertNotNull("Expected error message", result.getErrorMessage());
1099             }
1100         }
1101
1102         if (!expResultsMap.isEmpty()) {
1103             fail("Missing shard results for " + expResultsMap.keySet());
1104         }
1105     }
1106
1107     private static ShardResult successShardResult(String shardName, DataStoreType type) {
1108         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(TRUE).build();
1109     }
1110
1111     private static ShardResult failedShardResult(String shardName, DataStoreType type) {
1112         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(FALSE).build();
1113     }
1114 }