Convert CDS implementation to use msdal APIs
[controller.git] / opendaylight / md-sal / sal-cluster-admin-impl / src / test / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcServiceTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.admin;
9
10 import static java.lang.Boolean.FALSE;
11 import static java.lang.Boolean.TRUE;
12 import static org.hamcrest.CoreMatchers.anyOf;
13 import static org.hamcrest.CoreMatchers.containsString;
14 import static org.junit.Assert.assertEquals;
15 import static org.junit.Assert.assertFalse;
16 import static org.junit.Assert.assertNotNull;
17 import static org.junit.Assert.assertNull;
18 import static org.junit.Assert.assertThat;
19 import static org.junit.Assert.assertTrue;
20 import static org.junit.Assert.fail;
21 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
22 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
23 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
24
25 import akka.actor.ActorRef;
26 import akka.actor.PoisonPill;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import com.google.common.base.Optional;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.collect.Iterables;
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Sets;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.net.URI;
38 import java.util.AbstractMap.SimpleEntry;
39 import java.util.ArrayList;
40 import java.util.Arrays;
41 import java.util.Collections;
42 import java.util.HashMap;
43 import java.util.HashSet;
44 import java.util.List;
45 import java.util.Map;
46 import java.util.Map.Entry;
47 import java.util.Set;
48 import java.util.concurrent.TimeUnit;
49 import org.apache.commons.lang3.SerializationUtils;
50 import org.junit.After;
51 import org.junit.Before;
52 import org.junit.Test;
53 import org.mockito.Mockito;
54 import org.opendaylight.controller.cluster.access.concepts.MemberName;
55 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
56 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
57 import org.opendaylight.controller.cluster.datastore.MemberNode;
58 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
59 import org.opendaylight.controller.cluster.datastore.Shard;
60 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
61 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
62 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
64 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
65 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
66 import org.opendaylight.controller.cluster.raft.RaftState;
67 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
68 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
69 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
70 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
71 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
74 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
75 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
76 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
77 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
78 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
79 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
80 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.people.rev140818.People;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInputBuilder;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInputBuilder;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInputBuilder;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
108 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
109 import org.opendaylight.yangtools.yang.common.RpcError;
110 import org.opendaylight.yangtools.yang.common.RpcResult;
111 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
112
113 /**
114  * Unit tests for ClusterAdminRpcService.
115  *
116  * @author Thomas Pantelis
117  */
118 public class ClusterAdminRpcServiceTest {
119     private static final MemberName MEMBER_1 = MemberName.forName("member-1");
120     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
121     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
122     private final List<MemberNode> memberNodes = new ArrayList<>();
123
124     @Before
125     public void setUp() {
126         InMemoryJournal.clear();
127         InMemorySnapshotStore.clear();
128     }
129
130     @After
131     public void tearDown() {
132         for (MemberNode m : Lists.reverse(memberNodes)) {
133             m.cleanup();
134         }
135         memberNodes.clear();
136     }
137
138     @Test
139     public void testBackupDatastore() throws Exception {
140         MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1")
141                 .moduleShardsConfig("module-shards-member1.conf").waitForShardLeader("cars", "people")
142                 .testName("testBackupDatastore").build();
143
144         String fileName = "target/testBackupDatastore";
145         new File(fileName).delete();
146
147         ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null);
148
149         RpcResult<Void> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder()
150                 .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
151         verifySuccessfulRpcResult(rpcResult);
152
153         try (FileInputStream fis = new FileInputStream(fileName)) {
154             List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
155             assertEquals("DatastoreSnapshot size", 2, snapshots.size());
156
157             ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
158                     snapshots.get(1).getType(), snapshots.get(1));
159             verifyDatastoreSnapshot(node.configDataStore().getActorContext().getDataStoreName(),
160                     map.get(node.configDataStore().getActorContext().getDataStoreName()), "cars", "people");
161         } finally {
162             new File(fileName).delete();
163         }
164
165         // Test failure by killing a shard.
166
167         node.configDataStore().getActorContext().getShardManager().tell(node.datastoreContextBuilder()
168                 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
169
170         ActorRef carsShardActor = node.configDataStore().getActorContext().findLocalShard("cars").get();
171         node.kit().watch(carsShardActor);
172         carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
173         node.kit().expectTerminated(carsShardActor);
174
175         rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build())
176                 .get(5, TimeUnit.SECONDS);
177         assertFalse("isSuccessful", rpcResult.isSuccessful());
178         assertEquals("getErrors", 1, rpcResult.getErrors().size());
179     }
180
181     private static void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot,
182             String... expShardNames) {
183         assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
184         Set<String> shardNames = new HashSet<>();
185         for (DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
186             shardNames.add(s.getName());
187         }
188
189         assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
190     }
191
192     @Test
193     public void testAddRemovePrefixShardReplica() throws Exception {
194         String name = "testAddPrefixShardReplica";
195         String moduleShardsConfig = "module-shards-default.conf";
196
197         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
198                 .moduleShardsConfig(moduleShardsConfig).build();
199         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
200                 .moduleShardsConfig(moduleShardsConfig).build();
201         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
202                 .moduleShardsConfig(moduleShardsConfig).build();
203
204         member1.waitForMembersUp("member-2", "member-3");
205         replicaNode2.kit().waitForMembersUp("member-1", "member-3");
206         replicaNode3.kit().waitForMembersUp("member-1", "member-2");
207
208         final ActorRef shardManager1 = member1.configDataStore().getActorContext().getShardManager();
209
210         shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
211                         new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
212                         "prefix", Collections.singleton(MEMBER_1))),
213                 ActorRef.noSender());
214
215         member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(),
216                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
217
218         final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
219         final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
220         Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
221
222         addPrefixShardReplica(replicaNode2, identifier, serializer,
223                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1");
224
225         addPrefixShardReplica(replicaNode3, identifier, serializer,
226                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1", "member-2");
227
228         verifyRaftPeersPresent(member1.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
229                 "member-2", "member-3");
230
231         removePrefixShardReplica(member1, identifier, "member-3", serializer,
232                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-2");
233
234         verifyNoShardPresent(replicaNode3.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
235         verifyRaftPeersPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
236                 "member-1");
237
238         removePrefixShardReplica(member1, identifier, "member-2", serializer,
239                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
240
241         verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
242     }
243
244     @Test
245     public void testGetShardRole() throws Exception {
246         String name = "testGetShardRole";
247         String moduleShardsConfig = "module-shards-default-member-1.conf";
248
249         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
250                 .moduleShardsConfig(moduleShardsConfig).build();
251
252         member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), "default");
253
254         final RpcResult<GetShardRoleOutput> successResult =
255                 getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "default");
256         verifySuccessfulRpcResult(successResult);
257         assertEquals("Leader", successResult.getResult().getRole());
258
259         final RpcResult<GetShardRoleOutput> failedResult =
260                 getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "cars");
261
262         verifyFailedRpcResult(failedResult);
263
264         final ActorRef shardManager1 = member1.configDataStore().getActorContext().getShardManager();
265
266         shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
267                         new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
268                         "prefix", Collections.singleton(MEMBER_1))),
269                 ActorRef.noSender());
270
271         member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(),
272                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
273
274         final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
275         final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
276         Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
277
278         final RpcResult<GetPrefixShardRoleOutput> prefixSuccessResult =
279                 getPrefixShardRole(member1, identifier, serializer);
280
281         verifySuccessfulRpcResult(prefixSuccessResult);
282         assertEquals("Leader", prefixSuccessResult.getResult().getRole());
283
284         final InstanceIdentifier<People> peopleId = InstanceIdentifier.create(People.class);
285         Mockito.doReturn(PeopleModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(peopleId);
286
287         final RpcResult<GetPrefixShardRoleOutput> prefixFail =
288                 getPrefixShardRole(member1, peopleId, serializer);
289
290         verifyFailedRpcResult(prefixFail);
291     }
292
293     @Test
294     public void testGetPrefixShardRole() throws Exception {
295         String name = "testGetPrefixShardRole";
296         String moduleShardsConfig = "module-shards-default-member-1.conf";
297
298         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
299                 .moduleShardsConfig(moduleShardsConfig).build();
300
301         member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), "default");
302
303
304     }
305
306     @Test
307     public void testModuleShardLeaderMovement() throws Exception {
308         String name = "testModuleShardLeaderMovement";
309         String moduleShardsConfig = "module-shards-member1.conf";
310
311         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
312                 .waitForShardLeader("cars").moduleShardsConfig(moduleShardsConfig).build();
313         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
314                 .moduleShardsConfig(moduleShardsConfig).build();
315         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
316                 .moduleShardsConfig(moduleShardsConfig).build();
317
318         member1.waitForMembersUp("member-2", "member-3");
319         replicaNode2.waitForMembersUp("member-1");
320         replicaNode3.waitForMembersUp("member-1", "member-2");
321
322         doAddShardReplica(replicaNode2, "cars", "member-1");
323         doAddShardReplica(replicaNode3, "cars", "member-1", "member-2");
324
325         verifyRaftPeersPresent(member1.configDataStore(), "cars", "member-2", "member-3");
326
327         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
328
329         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
330
331         doMakeShardLeaderLocal(member1, "cars", "member-1");
332         verifyRaftState(replicaNode2.configDataStore(), "cars",
333             raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
334         verifyRaftState(replicaNode3.configDataStore(), "cars",
335             raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
336
337         doMakeShardLeaderLocal(replicaNode2, "cars", "member-2");
338         verifyRaftState(member1.configDataStore(), "cars",
339             raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
340         verifyRaftState(replicaNode3.configDataStore(), "cars",
341             raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
342
343         replicaNode2.waitForMembersUp("member-3");
344         doMakeShardLeaderLocal(replicaNode3, "cars", "member-3");
345     }
346
347     @Test
348     public void testAddShardReplica() throws Exception {
349         String name = "testAddShardReplica";
350         String moduleShardsConfig = "module-shards-cars-member-1.conf";
351         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
352                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
353
354         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
355                 .moduleShardsConfig(moduleShardsConfig).build();
356
357         leaderNode1.waitForMembersUp("member-2");
358
359         doAddShardReplica(newReplicaNode2, "cars", "member-1");
360
361         MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
362                 .moduleShardsConfig(moduleShardsConfig).build();
363
364         leaderNode1.waitForMembersUp("member-3");
365         newReplicaNode2.waitForMembersUp("member-3");
366
367         doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
368
369         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
370         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
371
372         // Write data to member-2's config datastore and read/verify via member-3
373         final NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
374                 newReplicaNode3.configDataStore());
375
376         // Write data to member-3's oper datastore and read/verify via member-2
377         writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
378
379         // Verify all data has been replicated. We expect 4 log entries and thus last applied index of 3 -
380         // 2 ServerConfigurationPayload entries,  the transaction payload entry plus a purge payload.
381
382         RaftStateVerifier verifier = raftState -> {
383             assertEquals("Commit index", 3, raftState.getCommitIndex());
384             assertEquals("Last applied index", 3, raftState.getLastApplied());
385         };
386
387         verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
388         verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
389
390         verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
391         verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
392
393         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
394         verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
395
396         // Restart member-3 and verify the cars config shard is re-instated.
397
398         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
399         newReplicaNode3.cleanup();
400
401         newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
402                 .moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
403
404         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
405         readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
406     }
407
408     @Test
409     public void testAddShardReplicaFailures() throws Exception {
410         String name = "testAddShardReplicaFailures";
411         MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
412                 .moduleShardsConfig("module-shards-cars-member-1.conf").build();
413
414         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
415                 memberNode.operDataStore(), null);
416
417         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
418                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
419         verifyFailedRpcResult(rpcResult);
420
421         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars")
422                 .build()).get(10, TimeUnit.SECONDS);
423         verifyFailedRpcResult(rpcResult);
424
425         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people")
426                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
427         verifyFailedRpcResult(rpcResult);
428     }
429
430     private static NormalizedNode<?, ?> writeCarsNodeAndVerify(AbstractDataStore writeToStore,
431             AbstractDataStore readFromStore) throws Exception {
432         DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
433         NormalizedNode<?, ?> carsNode = CarsModel.create();
434         writeTx.write(CarsModel.BASE_PATH, carsNode);
435
436         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
437         Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
438         assertEquals("canCommit", TRUE, canCommit);
439         cohort.preCommit().get(5, TimeUnit.SECONDS);
440         cohort.commit().get(5, TimeUnit.SECONDS);
441
442         readCarsNodeAndVerify(readFromStore, carsNode);
443         return carsNode;
444     }
445
446     private static void readCarsNodeAndVerify(AbstractDataStore readFromStore,
447             NormalizedNode<?, ?> expCarsNode) throws Exception {
448         Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction()
449                 .read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
450         assertTrue("isPresent", optional.isPresent());
451         assertEquals("Data node", expCarsNode, optional.get());
452     }
453
454     private RpcResult<GetShardRoleOutput> getShardRole(final MemberNode memberNode,
455                                                        final BindingNormalizedNodeSerializer serializer,
456                                                        final String shardName) throws Exception {
457
458         final GetShardRoleInput input = new GetShardRoleInputBuilder()
459                 .setDataStoreType(DataStoreType.Config)
460                 .setShardName(shardName)
461                 .build();
462
463         final ClusterAdminRpcService service =
464                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
465
466         return service.getShardRole(input).get(10, TimeUnit.SECONDS);
467
468     }
469
470     private RpcResult<GetPrefixShardRoleOutput> getPrefixShardRole(
471             final MemberNode memberNode,
472             final InstanceIdentifier<?> identifier,
473             final BindingNormalizedNodeSerializer serializer) throws Exception {
474
475         final GetPrefixShardRoleInput input = new GetPrefixShardRoleInputBuilder()
476                 .setDataStoreType(DataStoreType.Config)
477                 .setShardPrefix(identifier)
478                 .build();
479
480         final ClusterAdminRpcService service =
481                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
482
483         return service.getPrefixShardRole(input).get(10, TimeUnit.SECONDS);
484
485     }
486
487     private void addPrefixShardReplica(final MemberNode memberNode,
488                                        final InstanceIdentifier<?> identifier,
489                                        final BindingNormalizedNodeSerializer serializer,
490                                        final String shardName,
491                                        final String... peerMemberNames) throws Exception {
492
493         final AddPrefixShardReplicaInput input = new AddPrefixShardReplicaInputBuilder()
494                 .setShardPrefix(identifier)
495                 .setDataStoreType(DataStoreType.Config).build();
496
497         final ClusterAdminRpcService service =
498                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
499
500         final RpcResult<Void> rpcResult = service.addPrefixShardReplica(input).get(10, TimeUnit.SECONDS);
501         verifySuccessfulRpcResult(rpcResult);
502
503         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
504         Optional<ActorRef> optional = memberNode.configDataStore().getActorContext().findLocalShard(shardName);
505         assertTrue("Replica shard not present", optional.isPresent());
506     }
507
508     private void removePrefixShardReplica(final MemberNode memberNode,
509                                           final InstanceIdentifier<?> identifier,
510                                           final String removeFromMember,
511                                           final BindingNormalizedNodeSerializer serializer,
512                                           final String shardName,
513                                           final String... peerMemberNames) throws Exception {
514         final RemovePrefixShardReplicaInput input = new RemovePrefixShardReplicaInputBuilder()
515                 .setDataStoreType(DataStoreType.Config)
516                 .setShardPrefix(identifier)
517                 .setMemberName(removeFromMember).build();
518
519         final ClusterAdminRpcService service =
520                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
521
522         final RpcResult<Void> rpcResult = service.removePrefixShardReplica(input).get(10, TimeUnit.SECONDS);
523         verifySuccessfulRpcResult(rpcResult);
524
525         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
526     }
527
528     private static void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames)
529             throws Exception {
530         memberNode.waitForMembersUp(peerMemberNames);
531
532         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
533                 memberNode.operDataStore(), null);
534
535         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
536                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
537         verifySuccessfulRpcResult(rpcResult);
538
539         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
540
541         Optional<ActorRef> optional = memberNode.operDataStore().getActorContext().findLocalShard(shardName);
542         assertFalse("Oper shard present", optional.isPresent());
543
544         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
545                 .setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
546         verifySuccessfulRpcResult(rpcResult);
547
548         verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
549     }
550
551     private static void doMakeShardLeaderLocal(final MemberNode memberNode, String shardName, String newLeader)
552             throws Exception {
553         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
554                 memberNode.operDataStore(), null);
555
556         final RpcResult<Void> rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder()
557                 .setDataStoreType(DataStoreType.Config).setShardName(shardName).build())
558                 .get(10, TimeUnit.SECONDS);
559
560         verifySuccessfulRpcResult(rpcResult);
561
562         verifyRaftState(memberNode.configDataStore(), shardName, raftState -> assertThat(raftState.getLeader(),
563                 containsString(newLeader)));
564
565     }
566
567     private static <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
568         if (!rpcResult.isSuccessful()) {
569             if (rpcResult.getErrors().size() > 0) {
570                 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
571                 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
572             }
573
574             fail("Rpc failed with no error");
575         }
576
577         return rpcResult.getResult();
578     }
579
580     private static void verifyFailedRpcResult(RpcResult<?> rpcResult) {
581         assertFalse("RpcResult", rpcResult.isSuccessful());
582         assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
583         RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
584         assertNotNull("RpcResult error message null", error.getMessage());
585     }
586
587     @Test
588     public void testRemoveShardReplica() throws Exception {
589         String name = "testRemoveShardReplica";
590         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
591         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
592                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
593                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
594                 .build();
595
596         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
597                 .moduleShardsConfig(moduleShardsConfig).build();
598
599         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
600                 .moduleShardsConfig(moduleShardsConfig).build();
601
602         leaderNode1.configDataStore().waitTillReady();
603         replicaNode3.configDataStore().waitTillReady();
604         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
605         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
606         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
607
608         // Invoke RPC service on member-3 to remove it's local shard
609
610         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
611                 replicaNode3.operDataStore(), null);
612
613         RpcResult<Void> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder()
614                 .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build())
615                 .get(10, TimeUnit.SECONDS);
616         verifySuccessfulRpcResult(rpcResult);
617
618         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
619         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
620         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
621
622         // Restart member-2 and verify member-3 isn't present.
623
624         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
625         replicaNode2.cleanup();
626
627         MemberNode newPeplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
628                 .moduleShardsConfig(moduleShardsConfig).build();
629
630         newPeplicaNode2.configDataStore().waitTillReady();
631         verifyRaftPeersPresent(newPeplicaNode2.configDataStore(), "cars", "member-1");
632
633         // Invoke RPC service on member-1 to remove member-2
634
635         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
636                 leaderNode1.operDataStore(), null);
637
638         rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars")
639                 .setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
640         verifySuccessfulRpcResult(rpcResult);
641
642         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
643         verifyNoShardPresent(newPeplicaNode2.configDataStore(), "cars");
644     }
645
646     @Test
647     public void testRemoveShardLeaderReplica() throws Exception {
648         String name = "testRemoveShardLeaderReplica";
649         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
650         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
651                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
652                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
653                 .build();
654
655         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
656                 .moduleShardsConfig(moduleShardsConfig).build();
657
658         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
659                 .moduleShardsConfig(moduleShardsConfig).build();
660
661         leaderNode1.configDataStore().waitTillReady();
662         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
663         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
664         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
665
666         replicaNode2.waitForMembersUp("member-1", "member-3");
667         replicaNode3.waitForMembersUp("member-1", "member-2");
668
669         // Invoke RPC service on leader member-1 to remove it's local shard
670
671         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
672                 leaderNode1.operDataStore(), null);
673
674         RpcResult<Void> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
675                 .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build())
676                 .get(10, TimeUnit.SECONDS);
677         verifySuccessfulRpcResult(rpcResult);
678
679         verifyRaftState(replicaNode2.configDataStore(), "cars", raftState ->
680                 assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
681                         containsString("member-3"))));
682
683         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
684         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
685         verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
686     }
687
688     @Test
689     public void testAddReplicasForAllShards() throws Exception {
690         String name = "testAddReplicasForAllShards";
691         String moduleShardsConfig = "module-shards-member1.conf";
692         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
693                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
694
695         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
696                                                                                  "pets", null,
697                                                                                  Collections.singletonList(MEMBER_1));
698         leaderNode1.configDataStore().getActorContext().getShardManager().tell(
699                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
700         leaderNode1.kit().expectMsgClass(Success.class);
701         leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorContext(), "pets");
702
703         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
704                 .moduleShardsConfig(moduleShardsConfig).build();
705
706         leaderNode1.waitForMembersUp("member-2");
707         newReplicaNode2.waitForMembersUp("member-1");
708
709         newReplicaNode2.configDataStore().getActorContext().getShardManager().tell(
710                 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
711         newReplicaNode2.kit().expectMsgClass(Success.class);
712
713         newReplicaNode2.operDataStore().getActorContext().getShardManager().tell(
714                 new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
715                                                              "no-leader", null,
716                                                              Collections.singletonList(MEMBER_1)),
717                                 Shard.builder(), null),
718                                 newReplicaNode2.kit().getRef());
719         newReplicaNode2.kit().expectMsgClass(Success.class);
720
721         ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
722                 newReplicaNode2.operDataStore(), null);
723
724         RpcResult<AddReplicasForAllShardsOutput> rpcResult =
725                 service.addReplicasForAllShards().get(10, TimeUnit.SECONDS);
726         AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
727         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
728                 successShardResult("people", DataStoreType.Config),
729                 successShardResult("pets", DataStoreType.Config),
730                 successShardResult("cars", DataStoreType.Operational),
731                 successShardResult("people", DataStoreType.Operational),
732                 failedShardResult("no-leader", DataStoreType.Operational));
733
734         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
735         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
736         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
737         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
738         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
739     }
740
741     @Test
742     public void testRemoveAllShardReplicas() throws Exception {
743         String name = "testRemoveAllShardReplicas";
744         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
745         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
746                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
747                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
748                 .build();
749
750         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
751                 .moduleShardsConfig(moduleShardsConfig).build();
752
753         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
754                 .moduleShardsConfig(moduleShardsConfig).build();
755
756         leaderNode1.configDataStore().waitTillReady();
757         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
758         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
759         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
760
761         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
762                 "pets", null, Arrays.asList(MEMBER_1, MEMBER_2, MEMBER_3));
763         leaderNode1.configDataStore().getActorContext().getShardManager().tell(
764                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
765         leaderNode1.kit().expectMsgClass(Success.class);
766
767         replicaNode2.configDataStore().getActorContext().getShardManager().tell(
768                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef());
769         replicaNode2.kit().expectMsgClass(Success.class);
770
771         replicaNode3.configDataStore().getActorContext().getShardManager().tell(
772                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef());
773         replicaNode3.kit().expectMsgClass(Success.class);
774
775         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3");
776         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
777         verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
778
779         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
780                 replicaNode3.operDataStore(), null);
781
782         RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
783                 new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
784         RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult);
785         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
786                 successShardResult("people", DataStoreType.Config),
787                 successShardResult("pets", DataStoreType.Config),
788                 successShardResult("cars", DataStoreType.Operational),
789                 successShardResult("people", DataStoreType.Operational));
790
791         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
792         verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2");
793         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2");
794         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
795         verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1");
796         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1");
797         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
798         verifyNoShardPresent(replicaNode3.configDataStore(), "people");
799         verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
800     }
801
802     @Test
803     public void testChangeMemberVotingStatesForShard() throws Exception {
804         String name = "testChangeMemberVotingStatusForShard";
805         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
806         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
807                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
808                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
809                 .build();
810
811         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
812                 .moduleShardsConfig(moduleShardsConfig).build();
813
814         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
815                 .moduleShardsConfig(moduleShardsConfig).build();
816
817         leaderNode1.configDataStore().waitTillReady();
818         replicaNode3.configDataStore().waitTillReady();
819         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
820         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
821         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
822
823         // Invoke RPC service on member-3 to change voting status
824
825         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
826                 replicaNode3.operDataStore(), null);
827
828         RpcResult<Void> rpcResult = service3
829                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
830                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
831                         .setMemberVotingState(ImmutableList.of(
832                                 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
833                                 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build()))
834                         .build())
835                 .get(10, TimeUnit.SECONDS);
836         verifySuccessfulRpcResult(rpcResult);
837
838         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
839                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
840         verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
841                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
842         verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
843                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
844     }
845
846     @Test
847     public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
848         String name = "testChangeMemberVotingStatesForSingleNodeShard";
849         String moduleShardsConfig = "module-shards-member1.conf";
850         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
851                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
852                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
853                 .build();
854
855         leaderNode.configDataStore().waitTillReady();
856
857         // Invoke RPC service on member-3 to change voting status
858
859         ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
860                 leaderNode.operDataStore(), null);
861
862         RpcResult<Void> rpcResult = service
863                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
864                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
865                         .setMemberVotingState(ImmutableList
866                                 .of(new MemberVotingStateBuilder().setMemberName("member-1").setVoting(FALSE).build()))
867                         .build())
868                 .get(10, TimeUnit.SECONDS);
869         verifyFailedRpcResult(rpcResult);
870
871         verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE));
872     }
873
874     @Test
875     public void testChangeMemberVotingStatesForAllShards() throws Exception {
876         String name = "testChangeMemberVotingStatesForAllShards";
877         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
878         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
879                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
880                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
881                 .build();
882
883         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
884                 .moduleShardsConfig(moduleShardsConfig).build();
885
886         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
887                 .moduleShardsConfig(moduleShardsConfig).build();
888
889         leaderNode1.configDataStore().waitTillReady();
890         leaderNode1.operDataStore().waitTillReady();
891         replicaNode3.configDataStore().waitTillReady();
892         replicaNode3.operDataStore().waitTillReady();
893         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
894         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
895         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
896
897         // Invoke RPC service on member-3 to change voting status
898
899         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
900                 replicaNode3.operDataStore(), null);
901
902         RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
903                 new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
904                         new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
905                         new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build())).build())
906                 .get(10, TimeUnit.SECONDS);
907         ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
908         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
909                 successShardResult("people", DataStoreType.Config),
910                 successShardResult("cars", DataStoreType.Operational),
911                 successShardResult("people", DataStoreType.Operational));
912
913         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
914                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
915                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
916                 new String[]{"cars", "people"}, new SimpleEntry<>("member-1", TRUE),
917                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
918     }
919
920     @Test
921     public void testFlipMemberVotingStates() throws Exception {
922         String name = "testFlipMemberVotingStates";
923
924         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
925                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
926                 new ServerInfo("member-3", false)));
927
928         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
929         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
930         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
931
932         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
933         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
934                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
935                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
936                 .build();
937
938         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
939                 .moduleShardsConfig(moduleShardsConfig).build();
940
941         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
942                 .moduleShardsConfig(moduleShardsConfig).build();
943
944         leaderNode1.configDataStore().waitTillReady();
945         leaderNode1.operDataStore().waitTillReady();
946         replicaNode3.configDataStore().waitTillReady();
947         replicaNode3.operDataStore().waitTillReady();
948         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
949                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE));
950
951         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
952                 replicaNode3.operDataStore(), null);
953
954         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards()
955                 .get(10, TimeUnit.SECONDS);
956         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
957         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
958                 successShardResult("people", DataStoreType.Config),
959                 successShardResult("cars", DataStoreType.Operational),
960                 successShardResult("people", DataStoreType.Operational));
961
962         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
963                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
964                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
965                 new String[]{"cars", "people"},
966                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
967                 new SimpleEntry<>("member-3", TRUE));
968
969         // Leadership should have transferred to member 3 since it is the only remaining voting member.
970         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
971             assertNotNull("Expected non-null leader Id", raftState.getLeader());
972             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
973                     raftState.getLeader().contains("member-3"));
974         });
975
976         verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
977             assertNotNull("Expected non-null leader Id", raftState.getLeader());
978             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
979                     raftState.getLeader().contains("member-3"));
980         });
981
982         // Flip the voting states back to the original states.
983
984         rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS);
985         result = verifySuccessfulRpcResult(rpcResult);
986         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
987                 successShardResult("people", DataStoreType.Config),
988                 successShardResult("cars", DataStoreType.Operational),
989                 successShardResult("people", DataStoreType.Operational));
990
991         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
992                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
993                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
994                 new String[]{"cars", "people"},
995                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
996                 new SimpleEntry<>("member-3", FALSE));
997
998         // Leadership should have transferred to member 1 or 2.
999         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
1000             assertNotNull("Expected non-null leader Id", raftState.getLeader());
1001             assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
1002                     raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
1003         });
1004     }
1005
1006     @Test
1007     public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
1008         String name = "testFlipMemberVotingStatesWithNoInitialLeader";
1009
1010         // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
1011         // non-voting and simulated as down by not starting them up.
1012         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1013                 new ServerInfo("member-1", false), new ServerInfo("member-2", false),
1014                 new ServerInfo("member-3", false), new ServerInfo("member-4", true),
1015                 new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
1016
1017         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
1018         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
1019         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
1020
1021         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
1022         final MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
1023                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
1024                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
1025                 .build();
1026
1027         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
1028                 .moduleShardsConfig(moduleShardsConfig).build();
1029
1030         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
1031                 .moduleShardsConfig(moduleShardsConfig).build();
1032
1033         // Initially there won't be a leader b/c all the up nodes are non-voting.
1034
1035         replicaNode1.waitForMembersUp("member-2", "member-3");
1036
1037         verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", FALSE),
1038                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE),
1039                 new SimpleEntry<>("member-4", TRUE), new SimpleEntry<>("member-5", TRUE),
1040                 new SimpleEntry<>("member-6", TRUE));
1041
1042         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
1043             assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
1044
1045         ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
1046                 replicaNode1.operDataStore(), null);
1047
1048         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
1049                 .get(10, TimeUnit.SECONDS);
1050         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
1051         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
1052                 successShardResult("people", DataStoreType.Config),
1053                 successShardResult("cars", DataStoreType.Operational),
1054                 successShardResult("people", DataStoreType.Operational));
1055
1056         verifyVotingStates(new AbstractDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
1057                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
1058                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
1059                 new String[]{"cars", "people"},
1060                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
1061                 new SimpleEntry<>("member-3", TRUE), new SimpleEntry<>("member-4", FALSE),
1062                 new SimpleEntry<>("member-5", FALSE), new SimpleEntry<>("member-6", FALSE));
1063
1064         // Since member 1 was changed to voting and there was no leader, it should've started and election
1065         // and become leader
1066         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
1067             assertNotNull("Expected non-null leader Id", raftState.getLeader());
1068             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
1069                     raftState.getLeader().contains("member-1"));
1070         });
1071
1072         verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
1073             assertNotNull("Expected non-null leader Id", raftState.getLeader());
1074             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
1075                     raftState.getLeader().contains("member-1"));
1076         });
1077     }
1078
1079     @Test
1080     public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
1081         String name = "testFlipMemberVotingStatesWithVotingMembersDown";
1082
1083         // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
1084         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1085                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
1086                 new ServerInfo("member-3", true), new ServerInfo("member-4", false),
1087                 new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
1088
1089         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
1090         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
1091         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
1092
1093         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
1094         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
1095                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
1096                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
1097                 .build();
1098
1099         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
1100                 .moduleShardsConfig(moduleShardsConfig).build();
1101
1102         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
1103                 .moduleShardsConfig(moduleShardsConfig).build();
1104
1105         leaderNode1.configDataStore().waitTillReady();
1106         leaderNode1.operDataStore().waitTillReady();
1107         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
1108                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", TRUE),
1109                 new SimpleEntry<>("member-4", FALSE), new SimpleEntry<>("member-5", FALSE),
1110                 new SimpleEntry<>("member-6", FALSE));
1111
1112         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
1113                 leaderNode1.operDataStore(), null);
1114
1115         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
1116                 .get(10, TimeUnit.SECONDS);
1117         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
1118         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
1119                 successShardResult("people", DataStoreType.Config),
1120                 successShardResult("cars", DataStoreType.Operational),
1121                 successShardResult("people", DataStoreType.Operational));
1122
1123         // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
1124         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
1125                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
1126                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
1127                 new String[]{"cars", "people"},
1128                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
1129                 new SimpleEntry<>("member-3", FALSE), new SimpleEntry<>("member-4", TRUE),
1130                 new SimpleEntry<>("member-5", TRUE), new SimpleEntry<>("member-6", TRUE));
1131
1132         // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
1133         // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
1134         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
1135             assertNotNull("Expected non-null leader Id", raftState.getLeader());
1136             assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
1137         });
1138     }
1139
1140     private static void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
1141             String member, String datastoreTypeSuffix, String... shards) {
1142         String[] datastoreTypes = {"config_", "oper_"};
1143         for (String type : datastoreTypes) {
1144             for (String shard : shards) {
1145                 List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
1146                 for (ServerInfo info : serverConfig.getServerConfig()) {
1147                     newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
1148                             type + datastoreTypeSuffix).toString(), info.isVoting()));
1149                 }
1150
1151                 String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
1152                         type + datastoreTypeSuffix).toString();
1153                 InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
1154                 InMemoryJournal.addEntry(shardID, 2, new SimpleReplicatedLogEntry(0, 1,
1155                         new ServerConfigurationPayload(newServerInfo)));
1156             }
1157         }
1158     }
1159
1160     @SafeVarargs
1161     private static void verifyVotingStates(AbstractDataStore[] datastores, String[] shards,
1162             SimpleEntry<String, Boolean>... expStates) throws Exception {
1163         for (AbstractDataStore datastore: datastores) {
1164             for (String shard: shards) {
1165                 verifyVotingStates(datastore, shard, expStates);
1166             }
1167         }
1168     }
1169
1170     @SafeVarargs
1171     private static void verifyVotingStates(AbstractDataStore datastore, String shardName,
1172             SimpleEntry<String, Boolean>... expStates) throws Exception {
1173         String localMemberName = datastore.getActorContext().getCurrentMemberName().getName();
1174         Map<String, Boolean> expStateMap = new HashMap<>();
1175         for (Entry<String, Boolean> e: expStates) {
1176             expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
1177                     datastore.getActorContext().getDataStoreName()).toString(), e.getValue());
1178         }
1179
1180         verifyRaftState(datastore, shardName, raftState -> {
1181             String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
1182                     datastore.getActorContext().getDataStoreName()).toString();
1183             assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
1184             for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1185                 assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
1186             }
1187         });
1188     }
1189
1190     private static void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {
1191         Map<String, ShardResult> expResultsMap = new HashMap<>();
1192         for (ShardResult r: expShardResults) {
1193             expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
1194         }
1195
1196         for (ShardResult result: shardResults) {
1197             ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
1198             assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
1199                     result.getDataStoreType()), exp);
1200             assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded());
1201             if (exp.isSucceeded()) {
1202                 assertNull("Expected null error message", result.getErrorMessage());
1203             } else {
1204                 assertNotNull("Expected error message", result.getErrorMessage());
1205             }
1206         }
1207
1208         if (!expResultsMap.isEmpty()) {
1209             fail("Missing shard results for " + expResultsMap.keySet());
1210         }
1211     }
1212
1213     private static ShardResult successShardResult(String shardName, DataStoreType type) {
1214         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(TRUE).build();
1215     }
1216
1217     private static ShardResult failedShardResult(String shardName, DataStoreType type) {
1218         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(FALSE).build();
1219     }
1220 }