Rename ActorContext to ActorUtils
[controller.git] / opendaylight / md-sal / sal-cluster-admin-impl / src / test / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcServiceTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.admin;
9
10 import static java.lang.Boolean.FALSE;
11 import static java.lang.Boolean.TRUE;
12 import static org.hamcrest.CoreMatchers.anyOf;
13 import static org.hamcrest.CoreMatchers.containsString;
14 import static org.junit.Assert.assertEquals;
15 import static org.junit.Assert.assertFalse;
16 import static org.junit.Assert.assertNotNull;
17 import static org.junit.Assert.assertNull;
18 import static org.junit.Assert.assertThat;
19 import static org.junit.Assert.assertTrue;
20 import static org.junit.Assert.fail;
21 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
22 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
23 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
24
25 import akka.actor.ActorRef;
26 import akka.actor.PoisonPill;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import com.google.common.base.Optional;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.collect.Iterables;
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Sets;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.net.URI;
38 import java.util.AbstractMap.SimpleEntry;
39 import java.util.ArrayList;
40 import java.util.Arrays;
41 import java.util.Collections;
42 import java.util.HashMap;
43 import java.util.HashSet;
44 import java.util.List;
45 import java.util.Map;
46 import java.util.Map.Entry;
47 import java.util.Set;
48 import java.util.concurrent.TimeUnit;
49 import org.apache.commons.lang3.SerializationUtils;
50 import org.junit.After;
51 import org.junit.Before;
52 import org.junit.Test;
53 import org.mockito.Mockito;
54 import org.opendaylight.controller.cluster.access.concepts.MemberName;
55 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
56 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
57 import org.opendaylight.controller.cluster.datastore.MemberNode;
58 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
59 import org.opendaylight.controller.cluster.datastore.Shard;
60 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
61 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
62 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
64 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
65 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
66 import org.opendaylight.controller.cluster.raft.RaftState;
67 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
68 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
69 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
70 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
71 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
74 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
75 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
76 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
77 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
78 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
79 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
80 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.people.rev140818.People;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaOutput;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInputBuilder;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaOutput;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreOutput;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutput;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInputBuilder;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInputBuilder;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInputBuilder;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutput;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
108 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
109 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput;
110 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInputBuilder;
111 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaOutput;
112 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
113 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput;
114 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
115 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
116 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
117 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
118 import org.opendaylight.yangtools.yang.common.RpcError;
119 import org.opendaylight.yangtools.yang.common.RpcResult;
120 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
121
122 /**
123  * Unit tests for ClusterAdminRpcService.
124  *
125  * @author Thomas Pantelis
126  */
127 public class ClusterAdminRpcServiceTest {
128     private static final MemberName MEMBER_1 = MemberName.forName("member-1");
129     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
130     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
131     private final List<MemberNode> memberNodes = new ArrayList<>();
132
133     @Before
134     public void setUp() {
135         InMemoryJournal.clear();
136         InMemorySnapshotStore.clear();
137     }
138
139     @After
140     public void tearDown() {
141         for (MemberNode m : Lists.reverse(memberNodes)) {
142             m.cleanup();
143         }
144         memberNodes.clear();
145     }
146
147     @Test
148     public void testBackupDatastore() throws Exception {
149         MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1")
150                 .moduleShardsConfig("module-shards-member1.conf").waitForShardLeader("cars", "people")
151                 .testName("testBackupDatastore").build();
152
153         String fileName = "target/testBackupDatastore";
154         new File(fileName).delete();
155
156         ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null);
157
158         RpcResult<BackupDatastoreOutput> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder()
159                 .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
160         verifySuccessfulRpcResult(rpcResult);
161
162         try (FileInputStream fis = new FileInputStream(fileName)) {
163             List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
164             assertEquals("DatastoreSnapshot size", 2, snapshots.size());
165
166             ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
167                     snapshots.get(1).getType(), snapshots.get(1));
168             verifyDatastoreSnapshot(node.configDataStore().getActorUtils().getDataStoreName(),
169                     map.get(node.configDataStore().getActorUtils().getDataStoreName()), "cars", "people");
170         } finally {
171             new File(fileName).delete();
172         }
173
174         // Test failure by killing a shard.
175
176         node.configDataStore().getActorUtils().getShardManager().tell(node.datastoreContextBuilder()
177                 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
178
179         ActorRef carsShardActor = node.configDataStore().getActorUtils().findLocalShard("cars").get();
180         node.kit().watch(carsShardActor);
181         carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
182         node.kit().expectTerminated(carsShardActor);
183
184         rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build())
185                 .get(5, TimeUnit.SECONDS);
186         assertFalse("isSuccessful", rpcResult.isSuccessful());
187         assertEquals("getErrors", 1, rpcResult.getErrors().size());
188     }
189
190     private static void verifyDatastoreSnapshot(final String type, final DatastoreSnapshot datastoreSnapshot,
191             final String... expShardNames) {
192         assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
193         Set<String> shardNames = new HashSet<>();
194         for (DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
195             shardNames.add(s.getName());
196         }
197
198         assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
199     }
200
201     @Test
202     public void testAddRemovePrefixShardReplica() throws Exception {
203         String name = "testAddPrefixShardReplica";
204         String moduleShardsConfig = "module-shards-default.conf";
205
206         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
207                 .moduleShardsConfig(moduleShardsConfig).build();
208         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
209                 .moduleShardsConfig(moduleShardsConfig).build();
210         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
211                 .moduleShardsConfig(moduleShardsConfig).build();
212
213         member1.waitForMembersUp("member-2", "member-3");
214         replicaNode2.kit().waitForMembersUp("member-1", "member-3");
215         replicaNode3.kit().waitForMembersUp("member-1", "member-2");
216
217         final ActorRef shardManager1 = member1.configDataStore().getActorUtils().getShardManager();
218
219         shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
220                         new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
221                         "prefix", Collections.singleton(MEMBER_1))),
222                 ActorRef.noSender());
223
224         member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(),
225                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
226
227         final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
228         final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
229         Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
230
231         addPrefixShardReplica(replicaNode2, identifier, serializer,
232                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1");
233
234         addPrefixShardReplica(replicaNode3, identifier, serializer,
235                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1", "member-2");
236
237         verifyRaftPeersPresent(member1.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
238                 "member-2", "member-3");
239
240         removePrefixShardReplica(member1, identifier, "member-3", serializer,
241                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-2");
242
243         verifyNoShardPresent(replicaNode3.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
244         verifyRaftPeersPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
245                 "member-1");
246
247         removePrefixShardReplica(member1, identifier, "member-2", serializer,
248                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
249
250         verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
251     }
252
253     @Test
254     public void testGetShardRole() throws Exception {
255         String name = "testGetShardRole";
256         String moduleShardsConfig = "module-shards-default-member-1.conf";
257
258         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
259                 .moduleShardsConfig(moduleShardsConfig).build();
260
261         member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), "default");
262
263         final RpcResult<GetShardRoleOutput> successResult =
264                 getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "default");
265         verifySuccessfulRpcResult(successResult);
266         assertEquals("Leader", successResult.getResult().getRole());
267
268         final RpcResult<GetShardRoleOutput> failedResult =
269                 getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "cars");
270
271         verifyFailedRpcResult(failedResult);
272
273         final ActorRef shardManager1 = member1.configDataStore().getActorUtils().getShardManager();
274
275         shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
276                         new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
277                         "prefix", Collections.singleton(MEMBER_1))),
278                 ActorRef.noSender());
279
280         member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(),
281                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
282
283         final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
284         final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
285         Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
286
287         final RpcResult<GetPrefixShardRoleOutput> prefixSuccessResult =
288                 getPrefixShardRole(member1, identifier, serializer);
289
290         verifySuccessfulRpcResult(prefixSuccessResult);
291         assertEquals("Leader", prefixSuccessResult.getResult().getRole());
292
293         final InstanceIdentifier<People> peopleId = InstanceIdentifier.create(People.class);
294         Mockito.doReturn(PeopleModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(peopleId);
295
296         final RpcResult<GetPrefixShardRoleOutput> prefixFail =
297                 getPrefixShardRole(member1, peopleId, serializer);
298
299         verifyFailedRpcResult(prefixFail);
300     }
301
302     @Test
303     public void testGetPrefixShardRole() throws Exception {
304         String name = "testGetPrefixShardRole";
305         String moduleShardsConfig = "module-shards-default-member-1.conf";
306
307         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
308                 .moduleShardsConfig(moduleShardsConfig).build();
309
310         member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), "default");
311
312
313     }
314
315     @Test
316     public void testModuleShardLeaderMovement() throws Exception {
317         String name = "testModuleShardLeaderMovement";
318         String moduleShardsConfig = "module-shards-member1.conf";
319
320         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
321                 .waitForShardLeader("cars").moduleShardsConfig(moduleShardsConfig).build();
322         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
323                 .moduleShardsConfig(moduleShardsConfig).build();
324         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
325                 .moduleShardsConfig(moduleShardsConfig).build();
326
327         member1.waitForMembersUp("member-2", "member-3");
328         replicaNode2.waitForMembersUp("member-1");
329         replicaNode3.waitForMembersUp("member-1", "member-2");
330
331         doAddShardReplica(replicaNode2, "cars", "member-1");
332         doAddShardReplica(replicaNode3, "cars", "member-1", "member-2");
333
334         verifyRaftPeersPresent(member1.configDataStore(), "cars", "member-2", "member-3");
335
336         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
337
338         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
339
340         doMakeShardLeaderLocal(member1, "cars", "member-1");
341         verifyRaftState(replicaNode2.configDataStore(), "cars",
342             raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
343         verifyRaftState(replicaNode3.configDataStore(), "cars",
344             raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
345
346         doMakeShardLeaderLocal(replicaNode2, "cars", "member-2");
347         verifyRaftState(member1.configDataStore(), "cars",
348             raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
349         verifyRaftState(replicaNode3.configDataStore(), "cars",
350             raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
351
352         replicaNode2.waitForMembersUp("member-3");
353         doMakeShardLeaderLocal(replicaNode3, "cars", "member-3");
354     }
355
356     @Test
357     public void testAddShardReplica() throws Exception {
358         String name = "testAddShardReplica";
359         String moduleShardsConfig = "module-shards-cars-member-1.conf";
360         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
361                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
362
363         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
364                 .moduleShardsConfig(moduleShardsConfig).build();
365
366         leaderNode1.waitForMembersUp("member-2");
367
368         doAddShardReplica(newReplicaNode2, "cars", "member-1");
369
370         MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
371                 .moduleShardsConfig(moduleShardsConfig).build();
372
373         leaderNode1.waitForMembersUp("member-3");
374         newReplicaNode2.waitForMembersUp("member-3");
375
376         doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
377
378         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
379         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
380
381         // Write data to member-2's config datastore and read/verify via member-3
382         final NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
383                 newReplicaNode3.configDataStore());
384
385         // Write data to member-3's oper datastore and read/verify via member-2
386         writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
387
388         // Verify all data has been replicated. We expect 4 log entries and thus last applied index of 3 -
389         // 2 ServerConfigurationPayload entries,  the transaction payload entry plus a purge payload.
390
391         RaftStateVerifier verifier = raftState -> {
392             assertEquals("Commit index", 3, raftState.getCommitIndex());
393             assertEquals("Last applied index", 3, raftState.getLastApplied());
394         };
395
396         verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
397         verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
398
399         verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
400         verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
401
402         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
403         verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
404
405         // Restart member-3 and verify the cars config shard is re-instated.
406
407         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
408         newReplicaNode3.cleanup();
409
410         newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
411                 .moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
412
413         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
414         readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
415     }
416
417     @Test
418     public void testAddShardReplicaFailures() throws Exception {
419         String name = "testAddShardReplicaFailures";
420         MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
421                 .moduleShardsConfig("module-shards-cars-member-1.conf").build();
422
423         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
424                 memberNode.operDataStore(), null);
425
426         RpcResult<AddShardReplicaOutput> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
427                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
428         verifyFailedRpcResult(rpcResult);
429
430         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars")
431                 .build()).get(10, TimeUnit.SECONDS);
432         verifyFailedRpcResult(rpcResult);
433
434         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people")
435                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
436         verifyFailedRpcResult(rpcResult);
437     }
438
439     private static NormalizedNode<?, ?> writeCarsNodeAndVerify(final AbstractDataStore writeToStore,
440             final AbstractDataStore readFromStore) throws Exception {
441         DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
442         NormalizedNode<?, ?> carsNode = CarsModel.create();
443         writeTx.write(CarsModel.BASE_PATH, carsNode);
444
445         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
446         Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
447         assertEquals("canCommit", TRUE, canCommit);
448         cohort.preCommit().get(5, TimeUnit.SECONDS);
449         cohort.commit().get(5, TimeUnit.SECONDS);
450
451         readCarsNodeAndVerify(readFromStore, carsNode);
452         return carsNode;
453     }
454
455     private static void readCarsNodeAndVerify(final AbstractDataStore readFromStore,
456             final NormalizedNode<?, ?> expCarsNode) throws Exception {
457         java.util.Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction()
458                 .read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
459         assertTrue("isPresent", optional.isPresent());
460         assertEquals("Data node", expCarsNode, optional.get());
461     }
462
463     private static RpcResult<GetShardRoleOutput> getShardRole(final MemberNode memberNode,
464             final BindingNormalizedNodeSerializer serializer, final String shardName) throws Exception {
465
466         final GetShardRoleInput input = new GetShardRoleInputBuilder()
467                 .setDataStoreType(DataStoreType.Config)
468                 .setShardName(shardName)
469                 .build();
470
471         final ClusterAdminRpcService service =
472                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
473
474         return service.getShardRole(input).get(10, TimeUnit.SECONDS);
475     }
476
477     private static RpcResult<GetPrefixShardRoleOutput> getPrefixShardRole(
478             final MemberNode memberNode,
479             final InstanceIdentifier<?> identifier,
480             final BindingNormalizedNodeSerializer serializer) throws Exception {
481
482         final GetPrefixShardRoleInput input = new GetPrefixShardRoleInputBuilder()
483                 .setDataStoreType(DataStoreType.Config)
484                 .setShardPrefix(identifier)
485                 .build();
486
487         final ClusterAdminRpcService service =
488                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
489
490         return service.getPrefixShardRole(input).get(10, TimeUnit.SECONDS);
491     }
492
493     private static void addPrefixShardReplica(final MemberNode memberNode, final InstanceIdentifier<?> identifier,
494             final BindingNormalizedNodeSerializer serializer, final String shardName,
495             final String... peerMemberNames) throws Exception {
496
497         final AddPrefixShardReplicaInput input = new AddPrefixShardReplicaInputBuilder()
498                 .setShardPrefix(identifier)
499                 .setDataStoreType(DataStoreType.Config).build();
500
501         final ClusterAdminRpcService service =
502                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
503
504         final RpcResult<AddPrefixShardReplicaOutput> rpcResult = service.addPrefixShardReplica(input)
505                 .get(10, TimeUnit.SECONDS);
506         verifySuccessfulRpcResult(rpcResult);
507
508         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
509         Optional<ActorRef> optional = memberNode.configDataStore().getActorUtils().findLocalShard(shardName);
510         assertTrue("Replica shard not present", optional.isPresent());
511     }
512
513     private static void removePrefixShardReplica(final MemberNode memberNode, final InstanceIdentifier<?> identifier,
514             final String removeFromMember, final BindingNormalizedNodeSerializer serializer, final String shardName,
515             final String... peerMemberNames) throws Exception {
516         final RemovePrefixShardReplicaInput input = new RemovePrefixShardReplicaInputBuilder()
517                 .setDataStoreType(DataStoreType.Config)
518                 .setShardPrefix(identifier)
519                 .setMemberName(removeFromMember).build();
520
521         final ClusterAdminRpcService service =
522                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
523
524         final RpcResult<RemovePrefixShardReplicaOutput> rpcResult = service.removePrefixShardReplica(input)
525                 .get(10, TimeUnit.SECONDS);
526         verifySuccessfulRpcResult(rpcResult);
527
528         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
529     }
530
531     private static void doAddShardReplica(final MemberNode memberNode, final String shardName,
532             final String... peerMemberNames) throws Exception {
533         memberNode.waitForMembersUp(peerMemberNames);
534
535         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
536                 memberNode.operDataStore(), null);
537
538         RpcResult<AddShardReplicaOutput> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
539             .setShardName(shardName).setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
540         verifySuccessfulRpcResult(rpcResult);
541
542         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
543
544         Optional<ActorRef> optional = memberNode.operDataStore().getActorUtils().findLocalShard(shardName);
545         assertFalse("Oper shard present", optional.isPresent());
546
547         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
548                 .setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
549         verifySuccessfulRpcResult(rpcResult);
550
551         verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
552     }
553
554     private static void doMakeShardLeaderLocal(final MemberNode memberNode, final String shardName,
555             final String newLeader) throws Exception {
556         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
557                 memberNode.operDataStore(), null);
558
559         final RpcResult<MakeLeaderLocalOutput> rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder()
560                 .setDataStoreType(DataStoreType.Config).setShardName(shardName).build())
561                 .get(10, TimeUnit.SECONDS);
562
563         verifySuccessfulRpcResult(rpcResult);
564
565         verifyRaftState(memberNode.configDataStore(), shardName, raftState -> assertThat(raftState.getLeader(),
566                 containsString(newLeader)));
567     }
568
569     private static <T> T verifySuccessfulRpcResult(final RpcResult<T> rpcResult) {
570         if (!rpcResult.isSuccessful()) {
571             if (rpcResult.getErrors().size() > 0) {
572                 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
573                 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
574             }
575
576             fail("Rpc failed with no error");
577         }
578
579         return rpcResult.getResult();
580     }
581
582     private static void verifyFailedRpcResult(final RpcResult<?> rpcResult) {
583         assertFalse("RpcResult", rpcResult.isSuccessful());
584         assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
585         RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
586         assertNotNull("RpcResult error message null", error.getMessage());
587     }
588
589     @Test
590     public void testRemoveShardReplica() throws Exception {
591         String name = "testRemoveShardReplica";
592         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
593         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
594                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
595                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
596                 .build();
597
598         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
599                 .moduleShardsConfig(moduleShardsConfig).build();
600
601         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
602                 .moduleShardsConfig(moduleShardsConfig).build();
603
604         leaderNode1.configDataStore().waitTillReady();
605         replicaNode3.configDataStore().waitTillReady();
606         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
607         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
608         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
609
610         // Invoke RPC service on member-3 to remove it's local shard
611
612         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
613                 replicaNode3.operDataStore(), null);
614
615         RpcResult<RemoveShardReplicaOutput> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder()
616                 .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build())
617                 .get(10, TimeUnit.SECONDS);
618         verifySuccessfulRpcResult(rpcResult);
619
620         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
621         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
622         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
623
624         // Restart member-2 and verify member-3 isn't present.
625
626         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
627         replicaNode2.cleanup();
628
629         MemberNode newPeplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
630                 .moduleShardsConfig(moduleShardsConfig).build();
631
632         newPeplicaNode2.configDataStore().waitTillReady();
633         verifyRaftPeersPresent(newPeplicaNode2.configDataStore(), "cars", "member-1");
634
635         // Invoke RPC service on member-1 to remove member-2
636
637         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
638                 leaderNode1.operDataStore(), null);
639
640         rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars")
641                 .setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
642         verifySuccessfulRpcResult(rpcResult);
643
644         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
645         verifyNoShardPresent(newPeplicaNode2.configDataStore(), "cars");
646     }
647
648     @Test
649     public void testRemoveShardLeaderReplica() throws Exception {
650         String name = "testRemoveShardLeaderReplica";
651         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
652         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
653                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
654                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
655                 .build();
656
657         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
658                 .moduleShardsConfig(moduleShardsConfig).build();
659
660         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
661                 .moduleShardsConfig(moduleShardsConfig).build();
662
663         leaderNode1.configDataStore().waitTillReady();
664         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
665         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
666         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
667
668         replicaNode2.waitForMembersUp("member-1", "member-3");
669         replicaNode3.waitForMembersUp("member-1", "member-2");
670
671         // Invoke RPC service on leader member-1 to remove it's local shard
672
673         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
674                 leaderNode1.operDataStore(), null);
675
676         RpcResult<RemoveShardReplicaOutput> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
677                 .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build())
678                 .get(10, TimeUnit.SECONDS);
679         verifySuccessfulRpcResult(rpcResult);
680
681         verifyRaftState(replicaNode2.configDataStore(), "cars", raftState ->
682                 assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
683                         containsString("member-3"))));
684
685         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
686         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
687         verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
688     }
689
690     @Test
691     public void testAddReplicasForAllShards() throws Exception {
692         String name = "testAddReplicasForAllShards";
693         String moduleShardsConfig = "module-shards-member1.conf";
694         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
695                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
696
697         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
698                                                                                  "pets", null,
699                                                                                  Collections.singletonList(MEMBER_1));
700         leaderNode1.configDataStore().getActorUtils().getShardManager().tell(
701                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
702         leaderNode1.kit().expectMsgClass(Success.class);
703         leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorUtils(), "pets");
704
705         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
706                 .moduleShardsConfig(moduleShardsConfig).build();
707
708         leaderNode1.waitForMembersUp("member-2");
709         newReplicaNode2.waitForMembersUp("member-1");
710
711         newReplicaNode2.configDataStore().getActorUtils().getShardManager().tell(
712                 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
713         newReplicaNode2.kit().expectMsgClass(Success.class);
714
715         newReplicaNode2.operDataStore().getActorUtils().getShardManager().tell(
716                 new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
717                                                              "no-leader", null,
718                                                              Collections.singletonList(MEMBER_1)),
719                                 Shard.builder(), null),
720                                 newReplicaNode2.kit().getRef());
721         newReplicaNode2.kit().expectMsgClass(Success.class);
722
723         ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
724                 newReplicaNode2.operDataStore(), null);
725
726         RpcResult<AddReplicasForAllShardsOutput> rpcResult = service.addReplicasForAllShards(
727             new AddReplicasForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
728         AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
729         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
730                 successShardResult("people", DataStoreType.Config),
731                 successShardResult("pets", DataStoreType.Config),
732                 successShardResult("cars", DataStoreType.Operational),
733                 successShardResult("people", DataStoreType.Operational),
734                 failedShardResult("no-leader", DataStoreType.Operational));
735
736         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
737         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
738         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
739         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
740         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
741     }
742
743     @Test
744     public void testRemoveAllShardReplicas() throws Exception {
745         String name = "testRemoveAllShardReplicas";
746         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
747         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
748                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
749                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
750                 .build();
751
752         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
753                 .moduleShardsConfig(moduleShardsConfig).build();
754
755         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
756                 .moduleShardsConfig(moduleShardsConfig).build();
757
758         leaderNode1.configDataStore().waitTillReady();
759         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
760         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
761         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
762
763         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
764                 "pets", null, Arrays.asList(MEMBER_1, MEMBER_2, MEMBER_3));
765         leaderNode1.configDataStore().getActorUtils().getShardManager().tell(
766                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
767         leaderNode1.kit().expectMsgClass(Success.class);
768
769         replicaNode2.configDataStore().getActorUtils().getShardManager().tell(
770                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef());
771         replicaNode2.kit().expectMsgClass(Success.class);
772
773         replicaNode3.configDataStore().getActorUtils().getShardManager().tell(
774                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef());
775         replicaNode3.kit().expectMsgClass(Success.class);
776
777         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3");
778         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
779         verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
780
781         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
782                 replicaNode3.operDataStore(), null);
783
784         RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
785                 new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
786         RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult);
787         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
788                 successShardResult("people", DataStoreType.Config),
789                 successShardResult("pets", DataStoreType.Config),
790                 successShardResult("cars", DataStoreType.Operational),
791                 successShardResult("people", DataStoreType.Operational));
792
793         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
794         verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2");
795         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2");
796         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
797         verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1");
798         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1");
799         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
800         verifyNoShardPresent(replicaNode3.configDataStore(), "people");
801         verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
802     }
803
804     @Test
805     public void testChangeMemberVotingStatesForShard() throws Exception {
806         String name = "testChangeMemberVotingStatusForShard";
807         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
808         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
809                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
810                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
811                 .build();
812
813         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
814                 .moduleShardsConfig(moduleShardsConfig).build();
815
816         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
817                 .moduleShardsConfig(moduleShardsConfig).build();
818
819         leaderNode1.configDataStore().waitTillReady();
820         replicaNode3.configDataStore().waitTillReady();
821         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
822         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
823         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
824
825         // Invoke RPC service on member-3 to change voting status
826
827         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
828                 replicaNode3.operDataStore(), null);
829
830         RpcResult<ChangeMemberVotingStatesForShardOutput> rpcResult = service3
831                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
832                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
833                         .setMemberVotingState(ImmutableList.of(
834                                 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
835                                 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build()))
836                         .build())
837                 .get(10, TimeUnit.SECONDS);
838         verifySuccessfulRpcResult(rpcResult);
839
840         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
841                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
842         verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
843                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
844         verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
845                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
846     }
847
848     @Test
849     public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
850         String name = "testChangeMemberVotingStatesForSingleNodeShard";
851         String moduleShardsConfig = "module-shards-member1.conf";
852         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
853                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
854                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
855                 .build();
856
857         leaderNode.configDataStore().waitTillReady();
858
859         // Invoke RPC service on member-3 to change voting status
860
861         ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
862                 leaderNode.operDataStore(), null);
863
864         RpcResult<ChangeMemberVotingStatesForShardOutput> rpcResult = service
865                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
866                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
867                         .setMemberVotingState(ImmutableList
868                                 .of(new MemberVotingStateBuilder().setMemberName("member-1").setVoting(FALSE).build()))
869                         .build())
870                 .get(10, TimeUnit.SECONDS);
871         verifyFailedRpcResult(rpcResult);
872
873         verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE));
874     }
875
876     @Test
877     public void testChangeMemberVotingStatesForAllShards() throws Exception {
878         String name = "testChangeMemberVotingStatesForAllShards";
879         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
880         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
881                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
882                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
883                 .build();
884
885         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
886                 .moduleShardsConfig(moduleShardsConfig).build();
887
888         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
889                 .moduleShardsConfig(moduleShardsConfig).build();
890
891         leaderNode1.configDataStore().waitTillReady();
892         leaderNode1.operDataStore().waitTillReady();
893         replicaNode3.configDataStore().waitTillReady();
894         replicaNode3.operDataStore().waitTillReady();
895         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
896         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
897         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
898
899         // Invoke RPC service on member-3 to change voting status
900
901         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
902                 replicaNode3.operDataStore(), null);
903
904         RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
905                 new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
906                         new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
907                         new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build())).build())
908                 .get(10, TimeUnit.SECONDS);
909         ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
910         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
911                 successShardResult("people", DataStoreType.Config),
912                 successShardResult("cars", DataStoreType.Operational),
913                 successShardResult("people", DataStoreType.Operational));
914
915         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
916                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
917                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
918                 new String[]{"cars", "people"}, new SimpleEntry<>("member-1", TRUE),
919                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
920     }
921
922     @Test
923     public void testFlipMemberVotingStates() throws Exception {
924         String name = "testFlipMemberVotingStates";
925
926         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
927                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
928                 new ServerInfo("member-3", false)));
929
930         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
931         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
932         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
933
934         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
935         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
936                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder()
937                         .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10))
938                 .build();
939
940         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
941                 .moduleShardsConfig(moduleShardsConfig).build();
942
943         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
944                 .moduleShardsConfig(moduleShardsConfig).build();
945
946         leaderNode1.configDataStore().waitTillReady();
947         leaderNode1.operDataStore().waitTillReady();
948         replicaNode3.configDataStore().waitTillReady();
949         replicaNode3.operDataStore().waitTillReady();
950         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
951                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE));
952
953         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
954                 replicaNode3.operDataStore(), null);
955
956         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards(
957             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
958         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
959         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
960                 successShardResult("people", DataStoreType.Config),
961                 successShardResult("cars", DataStoreType.Operational),
962                 successShardResult("people", DataStoreType.Operational));
963
964         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
965                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
966                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
967                 new String[]{"cars", "people"},
968                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
969                 new SimpleEntry<>("member-3", TRUE));
970
971         // Leadership should have transferred to member 3 since it is the only remaining voting member.
972         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
973             assertNotNull("Expected non-null leader Id", raftState.getLeader());
974             assertTrue("Expected leader member-3. Actual: " + raftState.getLeader(),
975                     raftState.getLeader().contains("member-3"));
976         });
977
978         verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
979             assertNotNull("Expected non-null leader Id", raftState.getLeader());
980             assertTrue("Expected leader member-3. Actual: " + raftState.getLeader(),
981                     raftState.getLeader().contains("member-3"));
982         });
983
984         // Flip the voting states back to the original states.
985
986         rpcResult = service3.flipMemberVotingStatesForAllShards(
987             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
988         result = verifySuccessfulRpcResult(rpcResult);
989         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
990                 successShardResult("people", DataStoreType.Config),
991                 successShardResult("cars", DataStoreType.Operational),
992                 successShardResult("people", DataStoreType.Operational));
993
994         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
995                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
996                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
997                 new String[]{"cars", "people"},
998                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
999                 new SimpleEntry<>("member-3", FALSE));
1000
1001         // Leadership should have transferred to member 1 or 2.
1002         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
1003             assertNotNull("Expected non-null leader Id", raftState.getLeader());
1004             assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
1005                     raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
1006         });
1007     }
1008
1009     @Test
1010     public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
1011         String name = "testFlipMemberVotingStatesWithNoInitialLeader";
1012
1013         // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
1014         // non-voting and simulated as down by not starting them up.
1015         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1016                 new ServerInfo("member-1", false), new ServerInfo("member-2", false),
1017                 new ServerInfo("member-3", false), new ServerInfo("member-4", true),
1018                 new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
1019
1020         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
1021         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
1022         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
1023
1024         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
1025         final MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
1026                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
1027                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
1028                 .build();
1029
1030         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
1031                 .moduleShardsConfig(moduleShardsConfig).build();
1032
1033         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
1034                 .moduleShardsConfig(moduleShardsConfig).build();
1035
1036         // Initially there won't be a leader b/c all the up nodes are non-voting.
1037
1038         replicaNode1.waitForMembersUp("member-2", "member-3");
1039
1040         verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", FALSE),
1041                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE),
1042                 new SimpleEntry<>("member-4", TRUE), new SimpleEntry<>("member-5", TRUE),
1043                 new SimpleEntry<>("member-6", TRUE));
1044
1045         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
1046             assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
1047
1048         ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
1049                 replicaNode1.operDataStore(), null);
1050
1051         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards(
1052             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
1053         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
1054         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
1055                 successShardResult("people", DataStoreType.Config),
1056                 successShardResult("cars", DataStoreType.Operational),
1057                 successShardResult("people", DataStoreType.Operational));
1058
1059         verifyVotingStates(new AbstractDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
1060                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
1061                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
1062                 new String[]{"cars", "people"},
1063                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
1064                 new SimpleEntry<>("member-3", TRUE), new SimpleEntry<>("member-4", FALSE),
1065                 new SimpleEntry<>("member-5", FALSE), new SimpleEntry<>("member-6", FALSE));
1066
1067         // Since member 1 was changed to voting and there was no leader, it should've started and election
1068         // and become leader
1069         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
1070             assertNotNull("Expected non-null leader Id", raftState.getLeader());
1071             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
1072                     raftState.getLeader().contains("member-1"));
1073         });
1074
1075         verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
1076             assertNotNull("Expected non-null leader Id", raftState.getLeader());
1077             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
1078                     raftState.getLeader().contains("member-1"));
1079         });
1080     }
1081
1082     @Test
1083     public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
1084         String name = "testFlipMemberVotingStatesWithVotingMembersDown";
1085
1086         // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
1087         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1088                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
1089                 new ServerInfo("member-3", true), new ServerInfo("member-4", false),
1090                 new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
1091
1092         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
1093         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
1094         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
1095
1096         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
1097         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
1098                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
1099                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
1100                 .build();
1101
1102         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
1103                 .moduleShardsConfig(moduleShardsConfig).build();
1104
1105         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
1106                 .moduleShardsConfig(moduleShardsConfig).build();
1107
1108         leaderNode1.configDataStore().waitTillReady();
1109         leaderNode1.operDataStore().waitTillReady();
1110         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
1111                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", TRUE),
1112                 new SimpleEntry<>("member-4", FALSE), new SimpleEntry<>("member-5", FALSE),
1113                 new SimpleEntry<>("member-6", FALSE));
1114
1115         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
1116                 leaderNode1.operDataStore(), null);
1117
1118         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards(
1119             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
1120         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
1121         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
1122                 successShardResult("people", DataStoreType.Config),
1123                 successShardResult("cars", DataStoreType.Operational),
1124                 successShardResult("people", DataStoreType.Operational));
1125
1126         // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
1127         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
1128                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
1129                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
1130                 new String[]{"cars", "people"},
1131                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
1132                 new SimpleEntry<>("member-3", FALSE), new SimpleEntry<>("member-4", TRUE),
1133                 new SimpleEntry<>("member-5", TRUE), new SimpleEntry<>("member-6", TRUE));
1134
1135         // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
1136         // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
1137         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
1138             assertNotNull("Expected non-null leader Id", raftState.getLeader());
1139             assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
1140         });
1141     }
1142
1143     private static void setupPersistedServerConfigPayload(final ServerConfigurationPayload serverConfig,
1144             final String member, final String datastoreTypeSuffix, final String... shards) {
1145         String[] datastoreTypes = {"config_", "oper_"};
1146         for (String type : datastoreTypes) {
1147             for (String shard : shards) {
1148                 List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
1149                 for (ServerInfo info : serverConfig.getServerConfig()) {
1150                     newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
1151                             type + datastoreTypeSuffix).toString(), info.isVoting()));
1152                 }
1153
1154                 String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
1155                         type + datastoreTypeSuffix).toString();
1156                 InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
1157                 InMemoryJournal.addEntry(shardID, 2, new SimpleReplicatedLogEntry(0, 1,
1158                         new ServerConfigurationPayload(newServerInfo)));
1159             }
1160         }
1161     }
1162
1163     @SafeVarargs
1164     private static void verifyVotingStates(final AbstractDataStore[] datastores, final String[] shards,
1165             final SimpleEntry<String, Boolean>... expStates) throws Exception {
1166         for (AbstractDataStore datastore: datastores) {
1167             for (String shard: shards) {
1168                 verifyVotingStates(datastore, shard, expStates);
1169             }
1170         }
1171     }
1172
1173     @SafeVarargs
1174     private static void verifyVotingStates(final AbstractDataStore datastore, final String shardName,
1175             final SimpleEntry<String, Boolean>... expStates) throws Exception {
1176         String localMemberName = datastore.getActorUtils().getCurrentMemberName().getName();
1177         Map<String, Boolean> expStateMap = new HashMap<>();
1178         for (Entry<String, Boolean> e: expStates) {
1179             expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
1180                     datastore.getActorUtils().getDataStoreName()).toString(), e.getValue());
1181         }
1182
1183         verifyRaftState(datastore, shardName, raftState -> {
1184             String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
1185                     datastore.getActorUtils().getDataStoreName()).toString();
1186             assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
1187             for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1188                 assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
1189             }
1190         });
1191     }
1192
1193     private static void verifyShardResults(final List<ShardResult> shardResults, final ShardResult... expShardResults) {
1194         Map<String, ShardResult> expResultsMap = new HashMap<>();
1195         for (ShardResult r: expShardResults) {
1196             expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
1197         }
1198
1199         for (ShardResult result: shardResults) {
1200             ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
1201             assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
1202                     result.getDataStoreType()), exp);
1203             assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded());
1204             if (exp.isSucceeded()) {
1205                 assertNull("Expected null error message", result.getErrorMessage());
1206             } else {
1207                 assertNotNull("Expected error message", result.getErrorMessage());
1208             }
1209         }
1210
1211         if (!expResultsMap.isEmpty()) {
1212             fail("Missing shard results for " + expResultsMap.keySet());
1213         }
1214     }
1215
1216     private static ShardResult successShardResult(final String shardName, final DataStoreType type) {
1217         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(TRUE).build();
1218     }
1219
1220     private static ShardResult failedShardResult(final String shardName, final DataStoreType type) {
1221         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(FALSE).build();
1222     }
1223 }