BUG 8447: Add shard getRole rpcs
[controller.git] / opendaylight / md-sal / sal-cluster-admin-impl / src / test / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcServiceTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.admin;
9
10 import static java.lang.Boolean.FALSE;
11 import static java.lang.Boolean.TRUE;
12 import static org.hamcrest.CoreMatchers.anyOf;
13 import static org.hamcrest.CoreMatchers.containsString;
14 import static org.junit.Assert.assertEquals;
15 import static org.junit.Assert.assertFalse;
16 import static org.junit.Assert.assertNotNull;
17 import static org.junit.Assert.assertNull;
18 import static org.junit.Assert.assertThat;
19 import static org.junit.Assert.assertTrue;
20 import static org.junit.Assert.fail;
21 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
22 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
23 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
24
25 import akka.actor.ActorRef;
26 import akka.actor.PoisonPill;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import com.google.common.base.Optional;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.collect.Iterables;
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Sets;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.net.URI;
38 import java.util.AbstractMap.SimpleEntry;
39 import java.util.ArrayList;
40 import java.util.Arrays;
41 import java.util.Collections;
42 import java.util.HashMap;
43 import java.util.HashSet;
44 import java.util.List;
45 import java.util.Map;
46 import java.util.Map.Entry;
47 import java.util.Set;
48 import java.util.concurrent.TimeUnit;
49 import org.apache.commons.lang3.SerializationUtils;
50 import org.junit.After;
51 import org.junit.Before;
52 import org.junit.Test;
53 import org.mockito.Mockito;
54 import org.opendaylight.controller.cluster.access.concepts.MemberName;
55 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
56 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
57 import org.opendaylight.controller.cluster.datastore.MemberNode;
58 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
59 import org.opendaylight.controller.cluster.datastore.Shard;
60 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
61 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
62 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
64 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
65 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
66 import org.opendaylight.controller.cluster.raft.RaftState;
67 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
68 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
69 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
70 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
71 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
74 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
75 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
76 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
77 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
78 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
79 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
80 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.people.rev140818.People;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInputBuilder;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInputBuilder;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInputBuilder;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
108 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
109 import org.opendaylight.yangtools.yang.common.RpcError;
110 import org.opendaylight.yangtools.yang.common.RpcResult;
111 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
112
113 /**
114  * Unit tests for ClusterAdminRpcService.
115  *
116  * @author Thomas Pantelis
117  */
118 public class ClusterAdminRpcServiceTest {
119     private static final MemberName MEMBER_1 = MemberName.forName("member-1");
120     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
121     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
122     private final List<MemberNode> memberNodes = new ArrayList<>();
123
124     @Before
125     public void setUp() {
126         InMemoryJournal.clear();
127         InMemorySnapshotStore.clear();
128     }
129
130     @After
131     public void tearDown() {
132         for (MemberNode m : Lists.reverse(memberNodes)) {
133             m.cleanup();
134         }
135         memberNodes.clear();
136     }
137
138     @Test
139     public void testBackupDatastore() throws Exception {
140         MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1")
141                 .moduleShardsConfig("module-shards-member1.conf").waitForShardLeader("cars", "people")
142                 .testName("testBackupDatastore").build();
143
144         String fileName = "target/testBackupDatastore";
145         new File(fileName).delete();
146
147         ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null);
148
149         RpcResult<Void> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder()
150                 .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
151         verifySuccessfulRpcResult(rpcResult);
152
153         try (FileInputStream fis = new FileInputStream(fileName)) {
154             List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
155             assertEquals("DatastoreSnapshot size", 2, snapshots.size());
156
157             ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
158                     snapshots.get(1).getType(), snapshots.get(1));
159             verifyDatastoreSnapshot(node.configDataStore().getActorContext().getDataStoreName(),
160                     map.get(node.configDataStore().getActorContext().getDataStoreName()), "cars", "people");
161         } finally {
162             new File(fileName).delete();
163         }
164
165         // Test failure by killing a shard.
166
167         node.configDataStore().getActorContext().getShardManager().tell(node.datastoreContextBuilder()
168                 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
169
170         ActorRef carsShardActor = node.configDataStore().getActorContext().findLocalShard("cars").get();
171         node.kit().watch(carsShardActor);
172         carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
173         node.kit().expectTerminated(carsShardActor);
174
175         rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build())
176                 .get(5, TimeUnit.SECONDS);
177         assertFalse("isSuccessful", rpcResult.isSuccessful());
178         assertEquals("getErrors", 1, rpcResult.getErrors().size());
179     }
180
181     private static void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot,
182             String... expShardNames) {
183         assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
184         Set<String> shardNames = new HashSet<>();
185         for (DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
186             shardNames.add(s.getName());
187         }
188
189         assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
190     }
191
192     @Test
193     public void testAddRemovePrefixShardReplica() throws Exception {
194         String name = "testAddPrefixShardReplica";
195         String moduleShardsConfig = "module-shards-default.conf";
196
197         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
198                 .moduleShardsConfig(moduleShardsConfig).build();
199         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
200                 .moduleShardsConfig(moduleShardsConfig).build();
201         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
202                 .moduleShardsConfig(moduleShardsConfig).build();
203
204         member1.waitForMembersUp("member-2", "member-3");
205         replicaNode2.kit().waitForMembersUp("member-1", "member-3");
206         replicaNode3.kit().waitForMembersUp("member-1", "member-2");
207
208         final ActorRef shardManager1 = member1.configDataStore().getActorContext().getShardManager();
209
210         shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
211                         new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
212                         "prefix", Collections.singleton(MEMBER_1))),
213                 ActorRef.noSender());
214
215         member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(),
216                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
217
218         final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
219         final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
220         Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
221
222         addPrefixShardReplica(replicaNode2, identifier, serializer,
223                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1");
224
225         addPrefixShardReplica(replicaNode3, identifier, serializer,
226                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1", "member-2");
227
228         verifyRaftPeersPresent(member1.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
229                 "member-2", "member-3");
230
231         removePrefixShardReplica(member1, identifier, "member-3", serializer,
232                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-2");
233
234         verifyNoShardPresent(replicaNode3.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
235         verifyRaftPeersPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
236                 "member-1");
237
238         removePrefixShardReplica(member1, identifier, "member-2", serializer,
239                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
240
241         verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
242     }
243
244     @Test
245     public void testGetShardRole() throws Exception {
246         String name = "testGetShardRole";
247         String moduleShardsConfig = "module-shards-default-member-1.conf";
248
249         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
250                 .moduleShardsConfig(moduleShardsConfig).build();
251
252         member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), "default");
253
254         final RpcResult<GetShardRoleOutput> successResult =
255                 getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "default");
256         verifySuccessfulRpcResult(successResult);
257         assertEquals("Leader", successResult.getResult().getRole());
258
259         final RpcResult<GetShardRoleOutput> failedResult =
260                 getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "cars");
261
262         verifyFailedRpcResult(failedResult);
263
264         final ActorRef shardManager1 = member1.configDataStore().getActorContext().getShardManager();
265
266         shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
267                         new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
268                         "prefix", Collections.singleton(MEMBER_1))),
269                 ActorRef.noSender());
270
271         member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(),
272                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
273
274         final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
275         final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
276         Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
277
278         final RpcResult<GetPrefixShardRoleOutput> prefixSuccessResult =
279                 getPrefixShardRole(member1, identifier, serializer);
280
281         verifySuccessfulRpcResult(prefixSuccessResult);
282         assertEquals("Leader", prefixSuccessResult.getResult().getRole());
283
284         final InstanceIdentifier<People> peopleId = InstanceIdentifier.create(People.class);
285         Mockito.doReturn(PeopleModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(peopleId);
286
287         final RpcResult<GetPrefixShardRoleOutput> prefixFail =
288                 getPrefixShardRole(member1, peopleId, serializer);
289
290         verifyFailedRpcResult(prefixFail);
291     }
292
293     @Test
294     public void testGetPrefixShardRole() throws Exception {
295         String name = "testGetPrefixShardRole";
296         String moduleShardsConfig = "module-shards-default-member-1.conf";
297
298         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
299                 .moduleShardsConfig(moduleShardsConfig).build();
300
301         member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), "default");
302
303
304     }
305
306     @Test
307     public void testModuleShardLeaderMovement() throws Exception {
308         String name = "testModuleShardLeaderMovement";
309         String moduleShardsConfig = "module-shards-member1.conf";
310
311         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
312                 .waitForShardLeader("cars").moduleShardsConfig(moduleShardsConfig).build();
313         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
314                 .moduleShardsConfig(moduleShardsConfig).build();
315         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
316                 .moduleShardsConfig(moduleShardsConfig).build();
317
318         member1.waitForMembersUp("member-2", "member-3");
319         replicaNode2.waitForMembersUp("member-1");
320         replicaNode3.waitForMembersUp("member-1", "member-2");
321
322         doAddShardReplica(replicaNode2, "cars", "member-1");
323         doAddShardReplica(replicaNode3, "cars", "member-1", "member-2");
324
325         verifyRaftPeersPresent(member1.configDataStore(), "cars", "member-2", "member-3");
326
327         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
328
329         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
330
331         doMakeShardLeaderLocal(member1, "cars", "member-1");
332         verifyRaftState(replicaNode2.configDataStore(), "cars",
333             raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
334         verifyRaftState(replicaNode3.configDataStore(), "cars",
335             raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
336
337         doMakeShardLeaderLocal(replicaNode2, "cars", "member-2");
338         verifyRaftState(member1.configDataStore(), "cars",
339             raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
340         verifyRaftState(replicaNode3.configDataStore(), "cars",
341             raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
342
343         replicaNode2.waitForMembersUp("member-3");
344         doMakeShardLeaderLocal(replicaNode3, "cars", "member-3");
345     }
346
347     @Test
348     public void testAddShardReplica() throws Exception {
349         String name = "testAddShardReplica";
350         String moduleShardsConfig = "module-shards-cars-member-1.conf";
351         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
352                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
353
354         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
355                 .moduleShardsConfig(moduleShardsConfig).build();
356
357         leaderNode1.waitForMembersUp("member-2");
358
359         doAddShardReplica(newReplicaNode2, "cars", "member-1");
360
361         MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
362                 .moduleShardsConfig(moduleShardsConfig).build();
363
364         leaderNode1.waitForMembersUp("member-3");
365         newReplicaNode2.waitForMembersUp("member-3");
366
367         doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
368
369         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
370         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
371
372         // Write data to member-2's config datastore and read/verify via member-3
373         final NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
374                 newReplicaNode3.configDataStore());
375
376         // Write data to member-3's oper datastore and read/verify via member-2
377         writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
378
379         // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 -
380         // 2 ServerConfigurationPayload entries and the transaction payload entry.
381
382         RaftStateVerifier verifier = raftState -> {
383             assertEquals("Commit index", 2, raftState.getCommitIndex());
384             assertEquals("Last applied index", 2, raftState.getLastApplied());
385         };
386
387         verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
388         verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
389
390         verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
391         verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
392
393         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
394         verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
395
396         // Restart member-3 and verify the cars config shard is re-instated.
397
398         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
399         newReplicaNode3.cleanup();
400
401         newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
402                 .moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
403
404         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
405         readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
406     }
407
408     @Test
409     public void testAddShardReplicaFailures() throws Exception {
410         String name = "testAddShardReplicaFailures";
411         MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
412                 .moduleShardsConfig("module-shards-cars-member-1.conf").build();
413
414         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
415                 memberNode.operDataStore(), null);
416
417         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
418                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
419         verifyFailedRpcResult(rpcResult);
420
421         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars")
422                 .build()).get(10, TimeUnit.SECONDS);
423         verifyFailedRpcResult(rpcResult);
424
425         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people")
426                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
427         verifyFailedRpcResult(rpcResult);
428     }
429
430     private static NormalizedNode<?, ?> writeCarsNodeAndVerify(AbstractDataStore writeToStore,
431             AbstractDataStore readFromStore) throws Exception {
432         DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
433         NormalizedNode<?, ?> carsNode = CarsModel.create();
434         writeTx.write(CarsModel.BASE_PATH, carsNode);
435
436         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
437         Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
438         assertEquals("canCommit", TRUE, canCommit);
439         cohort.preCommit().get(5, TimeUnit.SECONDS);
440         cohort.commit().get(5, TimeUnit.SECONDS);
441
442         readCarsNodeAndVerify(readFromStore, carsNode);
443         return carsNode;
444     }
445
446     private static void readCarsNodeAndVerify(AbstractDataStore readFromStore,
447             NormalizedNode<?, ?> expCarsNode) throws Exception {
448         Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction()
449                 .read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
450         assertTrue("isPresent", optional.isPresent());
451         assertEquals("Data node", expCarsNode, optional.get());
452     }
453
454     private RpcResult<GetShardRoleOutput> getShardRole(final MemberNode memberNode,
455                                                        final BindingNormalizedNodeSerializer serializer,
456                                                        final String shardName) throws Exception {
457
458         final GetShardRoleInput input = new GetShardRoleInputBuilder()
459                 .setDataStoreType(DataStoreType.Config)
460                 .setShardName(shardName)
461                 .build();
462
463         final ClusterAdminRpcService service =
464                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
465
466         return service.getShardRole(input).get(10, TimeUnit.SECONDS);
467
468     }
469
470     private RpcResult<GetPrefixShardRoleOutput> getPrefixShardRole(
471             final MemberNode memberNode,
472             final InstanceIdentifier<?> identifier,
473             final BindingNormalizedNodeSerializer serializer) throws Exception {
474
475         final GetPrefixShardRoleInput input = new GetPrefixShardRoleInputBuilder()
476                 .setDataStoreType(DataStoreType.Config)
477                 .setShardPrefix(identifier)
478                 .build();
479
480         final ClusterAdminRpcService service =
481                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
482
483         return service.getPrefixShardRole(input).get(10, TimeUnit.SECONDS);
484
485     }
486
487     private void addPrefixShardReplica(final MemberNode memberNode,
488                                        final InstanceIdentifier<?> identifier,
489                                        final BindingNormalizedNodeSerializer serializer,
490                                        final String shardName,
491                                        final String... peerMemberNames) throws Exception {
492
493         final AddPrefixShardReplicaInput input = new AddPrefixShardReplicaInputBuilder()
494                 .setShardPrefix(identifier)
495                 .setDataStoreType(DataStoreType.Config).build();
496
497         final ClusterAdminRpcService service =
498                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
499
500         final RpcResult<Void> rpcResult = service.addPrefixShardReplica(input).get(10, TimeUnit.SECONDS);
501         verifySuccessfulRpcResult(rpcResult);
502
503         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
504         Optional<ActorRef> optional = memberNode.configDataStore().getActorContext().findLocalShard(shardName);
505         assertTrue("Replica shard not present", optional.isPresent());
506     }
507
508     private void removePrefixShardReplica(final MemberNode memberNode,
509                                           final InstanceIdentifier<?> identifier,
510                                           final String removeFromMember,
511                                           final BindingNormalizedNodeSerializer serializer,
512                                           final String shardName,
513                                           final String... peerMemberNames) throws Exception {
514         final RemovePrefixShardReplicaInput input = new RemovePrefixShardReplicaInputBuilder()
515                 .setDataStoreType(DataStoreType.Config)
516                 .setShardPrefix(identifier)
517                 .setMemberName(removeFromMember).build();
518
519         final ClusterAdminRpcService service =
520                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
521
522         final RpcResult<Void> rpcResult = service.removePrefixShardReplica(input).get(10, TimeUnit.SECONDS);
523         verifySuccessfulRpcResult(rpcResult);
524
525         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
526     }
527
528     private static void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames)
529             throws Exception {
530         memberNode.waitForMembersUp(peerMemberNames);
531
532         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
533                 memberNode.operDataStore(), null);
534
535         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
536                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
537         verifySuccessfulRpcResult(rpcResult);
538
539         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
540
541         Optional<ActorRef> optional = memberNode.operDataStore().getActorContext().findLocalShard(shardName);
542         assertFalse("Oper shard present", optional.isPresent());
543
544         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
545                 .setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
546         verifySuccessfulRpcResult(rpcResult);
547
548         verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
549     }
550
551     private static void doMakeShardLeaderLocal(final MemberNode memberNode, String shardName, String newLeader)
552             throws Exception {
553         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
554                 memberNode.operDataStore(), null);
555
556         final RpcResult<Void> rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder()
557                 .setDataStoreType(DataStoreType.Config).setShardName(shardName).build())
558                 .get(10, TimeUnit.SECONDS);
559
560         verifySuccessfulRpcResult(rpcResult);
561
562         verifyRaftState(memberNode.configDataStore(), shardName, raftState -> assertThat(raftState.getLeader(),
563                 containsString(newLeader)));
564
565     }
566
567     private static <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
568         if (!rpcResult.isSuccessful()) {
569             if (rpcResult.getErrors().size() > 0) {
570                 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
571                 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
572             }
573
574             fail("Rpc failed with no error");
575         }
576
577         return rpcResult.getResult();
578     }
579
580     private static void verifyFailedRpcResult(RpcResult<?> rpcResult) {
581         assertFalse("RpcResult", rpcResult.isSuccessful());
582         assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
583         RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
584         assertNotNull("RpcResult error message null", error.getMessage());
585     }
586
587     @Test
588     public void testRemoveShardReplica() throws Exception {
589         String name = "testRemoveShardReplica";
590         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
591         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
592                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
593                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
594                 .build();
595
596         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
597                 .moduleShardsConfig(moduleShardsConfig).build();
598
599         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
600                 .moduleShardsConfig(moduleShardsConfig).build();
601
602         leaderNode1.configDataStore().waitTillReady();
603         replicaNode3.configDataStore().waitTillReady();
604         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
605         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
606         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
607
608         // Invoke RPC service on member-3 to remove it's local shard
609
610         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
611                 replicaNode3.operDataStore(), null);
612
613         RpcResult<Void> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder()
614                 .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build())
615                 .get(10, TimeUnit.SECONDS);
616         verifySuccessfulRpcResult(rpcResult);
617
618         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
619         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
620         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
621
622         // Restart member-2 and verify member-3 isn't present.
623
624         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
625         replicaNode2.cleanup();
626
627         MemberNode newPeplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
628                 .moduleShardsConfig(moduleShardsConfig).build();
629
630         newPeplicaNode2.configDataStore().waitTillReady();
631         verifyRaftPeersPresent(newPeplicaNode2.configDataStore(), "cars", "member-1");
632
633         // Invoke RPC service on member-1 to remove member-2
634
635         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
636                 leaderNode1.operDataStore(), null);
637
638         rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars")
639                 .setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
640         verifySuccessfulRpcResult(rpcResult);
641
642         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
643         verifyNoShardPresent(newPeplicaNode2.configDataStore(), "cars");
644     }
645
646     @Test
647     public void testRemoveShardLeaderReplica() throws Exception {
648         String name = "testRemoveShardLeaderReplica";
649         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
650         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
651                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
652                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
653                 .build();
654
655         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
656                 .moduleShardsConfig(moduleShardsConfig).build();
657
658         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
659                 .moduleShardsConfig(moduleShardsConfig).build();
660
661         leaderNode1.configDataStore().waitTillReady();
662         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
663         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
664         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
665
666         replicaNode2.waitForMembersUp("member-1", "member-3");
667         replicaNode3.waitForMembersUp("member-1", "member-2");
668
669         // Invoke RPC service on leader member-1 to remove it's local shard
670
671         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
672                 leaderNode1.operDataStore(), null);
673
674         RpcResult<Void> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
675                 .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build())
676                 .get(10, TimeUnit.SECONDS);
677         verifySuccessfulRpcResult(rpcResult);
678
679         verifyRaftState(replicaNode2.configDataStore(), "cars", raftState ->
680                 assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
681                         containsString("member-3"))));
682
683         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
684         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
685         verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
686     }
687
688     @Test
689     public void testAddReplicasForAllShards() throws Exception {
690         String name = "testAddReplicasForAllShards";
691         String moduleShardsConfig = "module-shards-member1.conf";
692         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
693                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
694
695         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
696                 "pets", null, Arrays.asList(MEMBER_1));
697         leaderNode1.configDataStore().getActorContext().getShardManager().tell(
698                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
699         leaderNode1.kit().expectMsgClass(Success.class);
700         leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorContext(), "pets");
701
702         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
703                 .moduleShardsConfig(moduleShardsConfig).build();
704
705         leaderNode1.waitForMembersUp("member-2");
706         newReplicaNode2.waitForMembersUp("member-1");
707
708         newReplicaNode2.configDataStore().getActorContext().getShardManager().tell(
709                 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
710         newReplicaNode2.kit().expectMsgClass(Success.class);
711
712         newReplicaNode2.operDataStore().getActorContext().getShardManager().tell(
713                 new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
714                         "no-leader", null, Arrays.asList(MEMBER_1)), Shard.builder(), null),
715                                 newReplicaNode2.kit().getRef());
716         newReplicaNode2.kit().expectMsgClass(Success.class);
717
718         ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
719                 newReplicaNode2.operDataStore(), null);
720
721         RpcResult<AddReplicasForAllShardsOutput> rpcResult =
722                 service.addReplicasForAllShards().get(10, TimeUnit.SECONDS);
723         AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
724         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
725                 successShardResult("people", DataStoreType.Config),
726                 successShardResult("pets", DataStoreType.Config),
727                 successShardResult("cars", DataStoreType.Operational),
728                 successShardResult("people", DataStoreType.Operational),
729                 failedShardResult("no-leader", DataStoreType.Operational));
730
731         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
732         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
733         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
734         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
735         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
736     }
737
738     @Test
739     public void testRemoveAllShardReplicas() throws Exception {
740         String name = "testRemoveAllShardReplicas";
741         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
742         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
743                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
744                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
745                 .build();
746
747         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
748                 .moduleShardsConfig(moduleShardsConfig).build();
749
750         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
751                 .moduleShardsConfig(moduleShardsConfig).build();
752
753         leaderNode1.configDataStore().waitTillReady();
754         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
755         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
756         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
757
758         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
759                 "pets", null, Arrays.asList(MEMBER_1, MEMBER_2, MEMBER_3));
760         leaderNode1.configDataStore().getActorContext().getShardManager().tell(
761                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
762         leaderNode1.kit().expectMsgClass(Success.class);
763
764         replicaNode2.configDataStore().getActorContext().getShardManager().tell(
765                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef());
766         replicaNode2.kit().expectMsgClass(Success.class);
767
768         replicaNode3.configDataStore().getActorContext().getShardManager().tell(
769                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef());
770         replicaNode3.kit().expectMsgClass(Success.class);
771
772         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3");
773         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
774         verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
775
776         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
777                 replicaNode3.operDataStore(), null);
778
779         RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
780                 new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
781         RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult);
782         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
783                 successShardResult("people", DataStoreType.Config),
784                 successShardResult("pets", DataStoreType.Config),
785                 successShardResult("cars", DataStoreType.Operational),
786                 successShardResult("people", DataStoreType.Operational));
787
788         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
789         verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2");
790         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2");
791         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
792         verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1");
793         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1");
794         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
795         verifyNoShardPresent(replicaNode3.configDataStore(), "people");
796         verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
797     }
798
799     @Test
800     public void testChangeMemberVotingStatesForShard() throws Exception {
801         String name = "testChangeMemberVotingStatusForShard";
802         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
803         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
804                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
805                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
806                 .build();
807
808         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
809                 .moduleShardsConfig(moduleShardsConfig).build();
810
811         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
812                 .moduleShardsConfig(moduleShardsConfig).build();
813
814         leaderNode1.configDataStore().waitTillReady();
815         replicaNode3.configDataStore().waitTillReady();
816         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
817         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
818         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
819
820         // Invoke RPC service on member-3 to change voting status
821
822         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
823                 replicaNode3.operDataStore(), null);
824
825         RpcResult<Void> rpcResult = service3
826                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
827                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
828                         .setMemberVotingState(ImmutableList.of(
829                                 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
830                                 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build()))
831                         .build())
832                 .get(10, TimeUnit.SECONDS);
833         verifySuccessfulRpcResult(rpcResult);
834
835         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
836                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
837         verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
838                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
839         verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
840                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
841     }
842
843     @Test
844     public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
845         String name = "testChangeMemberVotingStatesForSingleNodeShard";
846         String moduleShardsConfig = "module-shards-member1.conf";
847         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
848                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
849                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
850                 .build();
851
852         leaderNode.configDataStore().waitTillReady();
853
854         // Invoke RPC service on member-3 to change voting status
855
856         ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
857                 leaderNode.operDataStore(), null);
858
859         RpcResult<Void> rpcResult = service
860                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
861                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
862                         .setMemberVotingState(ImmutableList
863                                 .of(new MemberVotingStateBuilder().setMemberName("member-1").setVoting(FALSE).build()))
864                         .build())
865                 .get(10, TimeUnit.SECONDS);
866         verifyFailedRpcResult(rpcResult);
867
868         verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE));
869     }
870
871     @Test
872     public void testChangeMemberVotingStatesForAllShards() throws Exception {
873         String name = "testChangeMemberVotingStatesForAllShards";
874         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
875         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
876                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
877                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
878                 .build();
879
880         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
881                 .moduleShardsConfig(moduleShardsConfig).build();
882
883         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
884                 .moduleShardsConfig(moduleShardsConfig).build();
885
886         leaderNode1.configDataStore().waitTillReady();
887         leaderNode1.operDataStore().waitTillReady();
888         replicaNode3.configDataStore().waitTillReady();
889         replicaNode3.operDataStore().waitTillReady();
890         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
891         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
892         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
893
894         // Invoke RPC service on member-3 to change voting status
895
896         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
897                 replicaNode3.operDataStore(), null);
898
899         RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
900                 new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
901                         new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
902                         new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build())).build())
903                 .get(10, TimeUnit.SECONDS);
904         ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
905         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
906                 successShardResult("people", DataStoreType.Config),
907                 successShardResult("cars", DataStoreType.Operational),
908                 successShardResult("people", DataStoreType.Operational));
909
910         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
911                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
912                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
913                 new String[]{"cars", "people"}, new SimpleEntry<>("member-1", TRUE),
914                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
915     }
916
917     @Test
918     public void testFlipMemberVotingStates() throws Exception {
919         String name = "testFlipMemberVotingStates";
920
921         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
922                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
923                 new ServerInfo("member-3", false)));
924
925         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
926         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
927         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
928
929         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
930         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
931                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
932                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
933                 .build();
934
935         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
936                 .moduleShardsConfig(moduleShardsConfig).build();
937
938         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
939                 .moduleShardsConfig(moduleShardsConfig).build();
940
941         leaderNode1.configDataStore().waitTillReady();
942         leaderNode1.operDataStore().waitTillReady();
943         replicaNode3.configDataStore().waitTillReady();
944         replicaNode3.operDataStore().waitTillReady();
945         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
946                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE));
947
948         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
949                 replicaNode3.operDataStore(), null);
950
951         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards()
952                 .get(10, TimeUnit.SECONDS);
953         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
954         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
955                 successShardResult("people", DataStoreType.Config),
956                 successShardResult("cars", DataStoreType.Operational),
957                 successShardResult("people", DataStoreType.Operational));
958
959         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
960                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
961                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
962                 new String[]{"cars", "people"},
963                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
964                 new SimpleEntry<>("member-3", TRUE));
965
966         // Leadership should have transferred to member 3 since it is the only remaining voting member.
967         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
968             assertNotNull("Expected non-null leader Id", raftState.getLeader());
969             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
970                     raftState.getLeader().contains("member-3"));
971         });
972
973         verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
974             assertNotNull("Expected non-null leader Id", raftState.getLeader());
975             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
976                     raftState.getLeader().contains("member-3"));
977         });
978
979         // Flip the voting states back to the original states.
980
981         rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS);
982         result = verifySuccessfulRpcResult(rpcResult);
983         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
984                 successShardResult("people", DataStoreType.Config),
985                 successShardResult("cars", DataStoreType.Operational),
986                 successShardResult("people", DataStoreType.Operational));
987
988         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
989                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
990                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
991                 new String[]{"cars", "people"},
992                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
993                 new SimpleEntry<>("member-3", FALSE));
994
995         // Leadership should have transferred to member 1 or 2.
996         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
997             assertNotNull("Expected non-null leader Id", raftState.getLeader());
998             assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
999                     raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
1000         });
1001     }
1002
1003     @Test
1004     public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
1005         String name = "testFlipMemberVotingStatesWithNoInitialLeader";
1006
1007         // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
1008         // non-voting and simulated as down by not starting them up.
1009         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1010                 new ServerInfo("member-1", false), new ServerInfo("member-2", false),
1011                 new ServerInfo("member-3", false), new ServerInfo("member-4", true),
1012                 new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
1013
1014         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
1015         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
1016         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
1017
1018         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
1019         final MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
1020                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
1021                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
1022                 .build();
1023
1024         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
1025                 .moduleShardsConfig(moduleShardsConfig).build();
1026
1027         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
1028                 .moduleShardsConfig(moduleShardsConfig).build();
1029
1030         // Initially there won't be a leader b/c all the up nodes are non-voting.
1031
1032         replicaNode1.waitForMembersUp("member-2", "member-3");
1033
1034         verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", FALSE),
1035                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE),
1036                 new SimpleEntry<>("member-4", TRUE), new SimpleEntry<>("member-5", TRUE),
1037                 new SimpleEntry<>("member-6", TRUE));
1038
1039         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
1040             assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
1041
1042         ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
1043                 replicaNode1.operDataStore(), null);
1044
1045         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
1046                 .get(10, TimeUnit.SECONDS);
1047         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
1048         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
1049                 successShardResult("people", DataStoreType.Config),
1050                 successShardResult("cars", DataStoreType.Operational),
1051                 successShardResult("people", DataStoreType.Operational));
1052
1053         verifyVotingStates(new AbstractDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
1054                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
1055                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
1056                 new String[]{"cars", "people"},
1057                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
1058                 new SimpleEntry<>("member-3", TRUE), new SimpleEntry<>("member-4", FALSE),
1059                 new SimpleEntry<>("member-5", FALSE), new SimpleEntry<>("member-6", FALSE));
1060
1061         // Since member 1 was changed to voting and there was no leader, it should've started and election
1062         // and become leader
1063         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
1064             assertNotNull("Expected non-null leader Id", raftState.getLeader());
1065             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
1066                     raftState.getLeader().contains("member-1"));
1067         });
1068
1069         verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
1070             assertNotNull("Expected non-null leader Id", raftState.getLeader());
1071             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
1072                     raftState.getLeader().contains("member-1"));
1073         });
1074     }
1075
1076     @Test
1077     public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
1078         String name = "testFlipMemberVotingStatesWithVotingMembersDown";
1079
1080         // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
1081         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1082                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
1083                 new ServerInfo("member-3", true), new ServerInfo("member-4", false),
1084                 new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
1085
1086         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
1087         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
1088         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
1089
1090         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
1091         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
1092                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
1093                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
1094                 .build();
1095
1096         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
1097                 .moduleShardsConfig(moduleShardsConfig).build();
1098
1099         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
1100                 .moduleShardsConfig(moduleShardsConfig).build();
1101
1102         leaderNode1.configDataStore().waitTillReady();
1103         leaderNode1.operDataStore().waitTillReady();
1104         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
1105                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", TRUE),
1106                 new SimpleEntry<>("member-4", FALSE), new SimpleEntry<>("member-5", FALSE),
1107                 new SimpleEntry<>("member-6", FALSE));
1108
1109         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
1110                 leaderNode1.operDataStore(), null);
1111
1112         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
1113                 .get(10, TimeUnit.SECONDS);
1114         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
1115         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
1116                 successShardResult("people", DataStoreType.Config),
1117                 successShardResult("cars", DataStoreType.Operational),
1118                 successShardResult("people", DataStoreType.Operational));
1119
1120         // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
1121         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
1122                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
1123                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
1124                 new String[]{"cars", "people"},
1125                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
1126                 new SimpleEntry<>("member-3", FALSE), new SimpleEntry<>("member-4", TRUE),
1127                 new SimpleEntry<>("member-5", TRUE), new SimpleEntry<>("member-6", TRUE));
1128
1129         // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
1130         // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
1131         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
1132             assertNotNull("Expected non-null leader Id", raftState.getLeader());
1133             assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
1134         });
1135     }
1136
1137     private static void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
1138             String member, String datastoreTypeSuffix, String... shards) {
1139         String[] datastoreTypes = {"config_", "oper_"};
1140         for (String type : datastoreTypes) {
1141             for (String shard : shards) {
1142                 List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
1143                 for (ServerInfo info : serverConfig.getServerConfig()) {
1144                     newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
1145                             type + datastoreTypeSuffix).toString(), info.isVoting()));
1146                 }
1147
1148                 String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
1149                         type + datastoreTypeSuffix).toString();
1150                 InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
1151                 InMemoryJournal.addEntry(shardID, 2, new SimpleReplicatedLogEntry(0, 1,
1152                         new ServerConfigurationPayload(newServerInfo)));
1153             }
1154         }
1155     }
1156
1157     @SafeVarargs
1158     private static void verifyVotingStates(AbstractDataStore[] datastores, String[] shards,
1159             SimpleEntry<String, Boolean>... expStates) throws Exception {
1160         for (AbstractDataStore datastore: datastores) {
1161             for (String shard: shards) {
1162                 verifyVotingStates(datastore, shard, expStates);
1163             }
1164         }
1165     }
1166
1167     @SafeVarargs
1168     private static void verifyVotingStates(AbstractDataStore datastore, String shardName,
1169             SimpleEntry<String, Boolean>... expStates) throws Exception {
1170         String localMemberName = datastore.getActorContext().getCurrentMemberName().getName();
1171         Map<String, Boolean> expStateMap = new HashMap<>();
1172         for (Entry<String, Boolean> e: expStates) {
1173             expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
1174                     datastore.getActorContext().getDataStoreName()).toString(), e.getValue());
1175         }
1176
1177         verifyRaftState(datastore, shardName, raftState -> {
1178             String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
1179                     datastore.getActorContext().getDataStoreName()).toString();
1180             assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
1181             for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1182                 assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
1183             }
1184         });
1185     }
1186
1187     private static void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {
1188         Map<String, ShardResult> expResultsMap = new HashMap<>();
1189         for (ShardResult r: expShardResults) {
1190             expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
1191         }
1192
1193         for (ShardResult result: shardResults) {
1194             ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
1195             assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
1196                     result.getDataStoreType()), exp);
1197             assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded());
1198             if (exp.isSucceeded()) {
1199                 assertNull("Expected null error message", result.getErrorMessage());
1200             } else {
1201                 assertNotNull("Expected error message", result.getErrorMessage());
1202             }
1203         }
1204
1205         if (!expResultsMap.isEmpty()) {
1206             fail("Missing shard results for " + expResultsMap.keySet());
1207         }
1208     }
1209
1210     private static ShardResult successShardResult(String shardName, DataStoreType type) {
1211         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(TRUE).build();
1212     }
1213
1214     private static ShardResult failedShardResult(String shardName, DataStoreType type) {
1215         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(FALSE).build();
1216     }
1217 }