0ba06e3bcd694648cbc3a6f0165582ce4ebc1ee9
[controller.git] / opendaylight / md-sal / sal-cluster-admin-impl / src / test / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcServiceTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.admin;
9
10 import static java.lang.Boolean.FALSE;
11 import static java.lang.Boolean.TRUE;
12 import static org.hamcrest.CoreMatchers.anyOf;
13 import static org.hamcrest.CoreMatchers.containsString;
14 import static org.hamcrest.MatcherAssert.assertThat;
15 import static org.junit.Assert.assertEquals;
16 import static org.junit.Assert.assertFalse;
17 import static org.junit.Assert.assertNotNull;
18 import static org.junit.Assert.assertNull;
19 import static org.junit.Assert.assertTrue;
20 import static org.junit.Assert.fail;
21 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
22 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
23 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
24
25 import akka.actor.ActorRef;
26 import akka.actor.PoisonPill;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import com.google.common.collect.ImmutableList;
30 import com.google.common.collect.ImmutableMap;
31 import com.google.common.collect.Iterables;
32 import com.google.common.collect.Lists;
33 import com.google.common.collect.Sets;
34 import java.io.File;
35 import java.io.FileInputStream;
36 import java.net.URI;
37 import java.util.AbstractMap.SimpleEntry;
38 import java.util.ArrayList;
39 import java.util.Arrays;
40 import java.util.Collections;
41 import java.util.HashMap;
42 import java.util.HashSet;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Map.Entry;
46 import java.util.Optional;
47 import java.util.Set;
48 import java.util.concurrent.TimeUnit;
49 import org.apache.commons.lang3.SerializationUtils;
50 import org.junit.After;
51 import org.junit.Before;
52 import org.junit.Test;
53 import org.mockito.Mockito;
54 import org.opendaylight.controller.cluster.access.concepts.MemberName;
55 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
56 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
57 import org.opendaylight.controller.cluster.datastore.MemberNode;
58 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
59 import org.opendaylight.controller.cluster.datastore.Shard;
60 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
61 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
62 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
64 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
65 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
66 import org.opendaylight.controller.cluster.raft.RaftState;
67 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
68 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
69 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
70 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
71 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
74 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
75 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
76 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
77 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
78 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
79 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
80 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.people.rev140818.People;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaOutput;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInputBuilder;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaOutput;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreOutput;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutput;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInputBuilder;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInputBuilder;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInputBuilder;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutput;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
108 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
109 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput;
110 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInputBuilder;
111 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaOutput;
112 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
113 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput;
114 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
115 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
116 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
117 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultKey;
118 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
119 import org.opendaylight.yangtools.yang.common.RpcError;
120 import org.opendaylight.yangtools.yang.common.RpcResult;
121 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
122
123 /**
124  * Unit tests for ClusterAdminRpcService.
125  *
126  * @author Thomas Pantelis
127  */
128 public class ClusterAdminRpcServiceTest {
129     private static final MemberName MEMBER_1 = MemberName.forName("member-1");
130     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
131     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
132     private final List<MemberNode> memberNodes = new ArrayList<>();
133
134     @Before
135     public void setUp() {
136         InMemoryJournal.clear();
137         InMemorySnapshotStore.clear();
138     }
139
140     @After
141     public void tearDown() {
142         for (MemberNode m : Lists.reverse(memberNodes)) {
143             m.cleanup();
144         }
145         memberNodes.clear();
146     }
147
148     @Test
149     public void testBackupDatastore() throws Exception {
150         MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1")
151                 .moduleShardsConfig("module-shards-member1.conf").waitForShardLeader("cars", "people")
152                 .testName("testBackupDatastore").build();
153
154         String fileName = "target/testBackupDatastore";
155         new File(fileName).delete();
156
157         ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null);
158
159         RpcResult<BackupDatastoreOutput> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder()
160                 .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
161         verifySuccessfulRpcResult(rpcResult);
162
163         try (FileInputStream fis = new FileInputStream(fileName)) {
164             List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
165             assertEquals("DatastoreSnapshot size", 2, snapshots.size());
166
167             ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
168                     snapshots.get(1).getType(), snapshots.get(1));
169             verifyDatastoreSnapshot(node.configDataStore().getActorUtils().getDataStoreName(),
170                     map.get(node.configDataStore().getActorUtils().getDataStoreName()), "cars", "people");
171         } finally {
172             new File(fileName).delete();
173         }
174
175         // Test failure by killing a shard.
176
177         node.configDataStore().getActorUtils().getShardManager().tell(node.datastoreContextBuilder()
178                 .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
179
180         ActorRef carsShardActor = node.configDataStore().getActorUtils().findLocalShard("cars").get();
181         node.kit().watch(carsShardActor);
182         carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
183         node.kit().expectTerminated(carsShardActor);
184
185         rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build())
186                 .get(5, TimeUnit.SECONDS);
187         assertFalse("isSuccessful", rpcResult.isSuccessful());
188         assertEquals("getErrors", 1, rpcResult.getErrors().size());
189     }
190
191     private static void verifyDatastoreSnapshot(final String type, final DatastoreSnapshot datastoreSnapshot,
192             final String... expShardNames) {
193         assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
194         Set<String> shardNames = new HashSet<>();
195         for (DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
196             shardNames.add(s.getName());
197         }
198
199         assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
200     }
201
202     @Test
203     public void testAddRemovePrefixShardReplica() throws Exception {
204         String name = "testAddPrefixShardReplica";
205         String moduleShardsConfig = "module-shards-default.conf";
206
207         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
208                 .moduleShardsConfig(moduleShardsConfig).build();
209         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
210                 .moduleShardsConfig(moduleShardsConfig).build();
211         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
212                 .moduleShardsConfig(moduleShardsConfig).build();
213
214         member1.waitForMembersUp("member-2", "member-3");
215         replicaNode2.kit().waitForMembersUp("member-1", "member-3");
216         replicaNode3.kit().waitForMembersUp("member-1", "member-2");
217
218         final ActorRef shardManager1 = member1.configDataStore().getActorUtils().getShardManager();
219
220         shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
221                         new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
222                         "prefix", Collections.singleton(MEMBER_1))),
223                 ActorRef.noSender());
224
225         member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(),
226                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
227
228         final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
229         final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
230         Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
231
232         addPrefixShardReplica(replicaNode2, identifier, serializer,
233                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1");
234
235         addPrefixShardReplica(replicaNode3, identifier, serializer,
236                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1", "member-2");
237
238         verifyRaftPeersPresent(member1.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
239                 "member-2", "member-3");
240
241         removePrefixShardReplica(member1, identifier, "member-3", serializer,
242                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-2");
243
244         verifyNoShardPresent(replicaNode3.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
245         verifyRaftPeersPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
246                 "member-1");
247
248         removePrefixShardReplica(member1, identifier, "member-2", serializer,
249                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
250
251         verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
252     }
253
254     @Test
255     public void testGetShardRole() throws Exception {
256         String name = "testGetShardRole";
257         String moduleShardsConfig = "module-shards-default-member-1.conf";
258
259         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
260                 .moduleShardsConfig(moduleShardsConfig).build();
261
262         member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), "default");
263
264         final RpcResult<GetShardRoleOutput> successResult =
265                 getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "default");
266         verifySuccessfulRpcResult(successResult);
267         assertEquals("Leader", successResult.getResult().getRole());
268
269         final RpcResult<GetShardRoleOutput> failedResult =
270                 getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "cars");
271
272         verifyFailedRpcResult(failedResult);
273
274         final ActorRef shardManager1 = member1.configDataStore().getActorUtils().getShardManager();
275
276         shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
277                         new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
278                         "prefix", Collections.singleton(MEMBER_1))),
279                 ActorRef.noSender());
280
281         member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(),
282                 ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
283
284         final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
285         final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
286         Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
287
288         final RpcResult<GetPrefixShardRoleOutput> prefixSuccessResult =
289                 getPrefixShardRole(member1, identifier, serializer);
290
291         verifySuccessfulRpcResult(prefixSuccessResult);
292         assertEquals("Leader", prefixSuccessResult.getResult().getRole());
293
294         final InstanceIdentifier<People> peopleId = InstanceIdentifier.create(People.class);
295         Mockito.doReturn(PeopleModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(peopleId);
296
297         final RpcResult<GetPrefixShardRoleOutput> prefixFail =
298                 getPrefixShardRole(member1, peopleId, serializer);
299
300         verifyFailedRpcResult(prefixFail);
301     }
302
303     @Test
304     public void testGetPrefixShardRole() throws Exception {
305         String name = "testGetPrefixShardRole";
306         String moduleShardsConfig = "module-shards-default-member-1.conf";
307
308         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
309                 .moduleShardsConfig(moduleShardsConfig).build();
310
311         member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), "default");
312
313
314     }
315
316     @Test
317     public void testModuleShardLeaderMovement() throws Exception {
318         String name = "testModuleShardLeaderMovement";
319         String moduleShardsConfig = "module-shards-member1.conf";
320
321         final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
322                 .waitForShardLeader("cars").moduleShardsConfig(moduleShardsConfig).build();
323         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
324                 .moduleShardsConfig(moduleShardsConfig).build();
325         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
326                 .moduleShardsConfig(moduleShardsConfig).build();
327
328         member1.waitForMembersUp("member-2", "member-3");
329         replicaNode2.waitForMembersUp("member-1");
330         replicaNode3.waitForMembersUp("member-1", "member-2");
331
332         doAddShardReplica(replicaNode2, "cars", "member-1");
333         doAddShardReplica(replicaNode3, "cars", "member-1", "member-2");
334
335         verifyRaftPeersPresent(member1.configDataStore(), "cars", "member-2", "member-3");
336
337         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
338
339         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
340
341         doMakeShardLeaderLocal(member1, "cars", "member-1");
342         verifyRaftState(replicaNode2.configDataStore(), "cars",
343             raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
344         verifyRaftState(replicaNode3.configDataStore(), "cars",
345             raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
346
347         doMakeShardLeaderLocal(replicaNode2, "cars", "member-2");
348         verifyRaftState(member1.configDataStore(), "cars",
349             raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
350         verifyRaftState(replicaNode3.configDataStore(), "cars",
351             raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
352
353         replicaNode2.waitForMembersUp("member-3");
354         doMakeShardLeaderLocal(replicaNode3, "cars", "member-3");
355     }
356
357     @Test
358     public void testAddShardReplica() throws Exception {
359         String name = "testAddShardReplica";
360         String moduleShardsConfig = "module-shards-cars-member-1.conf";
361         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
362                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
363
364         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
365                 .moduleShardsConfig(moduleShardsConfig).build();
366
367         leaderNode1.waitForMembersUp("member-2");
368
369         doAddShardReplica(newReplicaNode2, "cars", "member-1");
370
371         MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
372                 .moduleShardsConfig(moduleShardsConfig).build();
373
374         leaderNode1.waitForMembersUp("member-3");
375         newReplicaNode2.waitForMembersUp("member-3");
376
377         doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
378
379         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
380         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
381
382         // Write data to member-2's config datastore and read/verify via member-3
383         final NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
384                 newReplicaNode3.configDataStore());
385
386         // Write data to member-3's oper datastore and read/verify via member-2
387         writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
388
389         // Verify all data has been replicated. We expect 4 log entries and thus last applied index of 3 -
390         // 2 ServerConfigurationPayload entries,  the transaction payload entry plus a purge payload.
391
392         RaftStateVerifier verifier = raftState -> {
393             assertEquals("Commit index", 4, raftState.getCommitIndex());
394             assertEquals("Last applied index", 4, raftState.getLastApplied());
395         };
396
397         verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
398         verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
399
400         verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
401         verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
402
403         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
404         verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
405
406         // Restart member-3 and verify the cars config shard is re-instated.
407
408         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
409         newReplicaNode3.cleanup();
410
411         newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
412                 .moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
413
414         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
415         readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
416     }
417
418     @Test
419     public void testAddShardReplicaFailures() throws Exception {
420         String name = "testAddShardReplicaFailures";
421         MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
422                 .moduleShardsConfig("module-shards-cars-member-1.conf").build();
423
424         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
425                 memberNode.operDataStore(), null);
426
427         RpcResult<AddShardReplicaOutput> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
428                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
429         verifyFailedRpcResult(rpcResult);
430
431         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars")
432                 .build()).get(10, TimeUnit.SECONDS);
433         verifyFailedRpcResult(rpcResult);
434
435         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people")
436                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
437         verifyFailedRpcResult(rpcResult);
438     }
439
440     private static NormalizedNode<?, ?> writeCarsNodeAndVerify(final AbstractDataStore writeToStore,
441             final AbstractDataStore readFromStore) throws Exception {
442         DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
443         NormalizedNode<?, ?> carsNode = CarsModel.create();
444         writeTx.write(CarsModel.BASE_PATH, carsNode);
445
446         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
447         Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
448         assertEquals("canCommit", TRUE, canCommit);
449         cohort.preCommit().get(5, TimeUnit.SECONDS);
450         cohort.commit().get(5, TimeUnit.SECONDS);
451
452         readCarsNodeAndVerify(readFromStore, carsNode);
453         return carsNode;
454     }
455
456     private static void readCarsNodeAndVerify(final AbstractDataStore readFromStore,
457             final NormalizedNode<?, ?> expCarsNode) throws Exception {
458         Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction().read(CarsModel.BASE_PATH)
459                 .get(15, TimeUnit.SECONDS);
460         assertTrue("isPresent", optional.isPresent());
461         assertEquals("Data node", expCarsNode, optional.get());
462     }
463
464     private static RpcResult<GetShardRoleOutput> getShardRole(final MemberNode memberNode,
465             final BindingNormalizedNodeSerializer serializer, final String shardName) throws Exception {
466
467         final GetShardRoleInput input = new GetShardRoleInputBuilder()
468                 .setDataStoreType(DataStoreType.Config)
469                 .setShardName(shardName)
470                 .build();
471
472         final ClusterAdminRpcService service =
473                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
474
475         return service.getShardRole(input).get(10, TimeUnit.SECONDS);
476     }
477
478     private static RpcResult<GetPrefixShardRoleOutput> getPrefixShardRole(
479             final MemberNode memberNode,
480             final InstanceIdentifier<?> identifier,
481             final BindingNormalizedNodeSerializer serializer) throws Exception {
482
483         final GetPrefixShardRoleInput input = new GetPrefixShardRoleInputBuilder()
484                 .setDataStoreType(DataStoreType.Config)
485                 .setShardPrefix(identifier)
486                 .build();
487
488         final ClusterAdminRpcService service =
489                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
490
491         return service.getPrefixShardRole(input).get(10, TimeUnit.SECONDS);
492     }
493
494     private static void addPrefixShardReplica(final MemberNode memberNode, final InstanceIdentifier<?> identifier,
495             final BindingNormalizedNodeSerializer serializer, final String shardName,
496             final String... peerMemberNames) throws Exception {
497
498         final AddPrefixShardReplicaInput input = new AddPrefixShardReplicaInputBuilder()
499                 .setShardPrefix(identifier)
500                 .setDataStoreType(DataStoreType.Config).build();
501
502         final ClusterAdminRpcService service =
503                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
504
505         final RpcResult<AddPrefixShardReplicaOutput> rpcResult = service.addPrefixShardReplica(input)
506                 .get(10, TimeUnit.SECONDS);
507         verifySuccessfulRpcResult(rpcResult);
508
509         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
510         Optional<ActorRef> optional = memberNode.configDataStore().getActorUtils().findLocalShard(shardName);
511         assertTrue("Replica shard not present", optional.isPresent());
512     }
513
514     private static void removePrefixShardReplica(final MemberNode memberNode, final InstanceIdentifier<?> identifier,
515             final String removeFromMember, final BindingNormalizedNodeSerializer serializer, final String shardName,
516             final String... peerMemberNames) throws Exception {
517         final RemovePrefixShardReplicaInput input = new RemovePrefixShardReplicaInputBuilder()
518                 .setDataStoreType(DataStoreType.Config)
519                 .setShardPrefix(identifier)
520                 .setMemberName(removeFromMember).build();
521
522         final ClusterAdminRpcService service =
523                 new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
524
525         final RpcResult<RemovePrefixShardReplicaOutput> rpcResult = service.removePrefixShardReplica(input)
526                 .get(10, TimeUnit.SECONDS);
527         verifySuccessfulRpcResult(rpcResult);
528
529         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
530     }
531
532     private static void doAddShardReplica(final MemberNode memberNode, final String shardName,
533             final String... peerMemberNames) throws Exception {
534         memberNode.waitForMembersUp(peerMemberNames);
535
536         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
537                 memberNode.operDataStore(), null);
538
539         RpcResult<AddShardReplicaOutput> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
540             .setShardName(shardName).setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
541         verifySuccessfulRpcResult(rpcResult);
542
543         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
544
545         Optional<ActorRef> optional = memberNode.operDataStore().getActorUtils().findLocalShard(shardName);
546         assertFalse("Oper shard present", optional.isPresent());
547
548         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
549                 .setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
550         verifySuccessfulRpcResult(rpcResult);
551
552         verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
553     }
554
555     private static void doMakeShardLeaderLocal(final MemberNode memberNode, final String shardName,
556             final String newLeader) throws Exception {
557         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
558                 memberNode.operDataStore(), null);
559
560         final RpcResult<MakeLeaderLocalOutput> rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder()
561                 .setDataStoreType(DataStoreType.Config).setShardName(shardName).build())
562                 .get(10, TimeUnit.SECONDS);
563
564         verifySuccessfulRpcResult(rpcResult);
565
566         verifyRaftState(memberNode.configDataStore(), shardName, raftState -> assertThat(raftState.getLeader(),
567                 containsString(newLeader)));
568     }
569
570     private static <T> T verifySuccessfulRpcResult(final RpcResult<T> rpcResult) {
571         if (!rpcResult.isSuccessful()) {
572             if (rpcResult.getErrors().size() > 0) {
573                 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
574                 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
575             }
576
577             fail("Rpc failed with no error");
578         }
579
580         return rpcResult.getResult();
581     }
582
583     private static void verifyFailedRpcResult(final RpcResult<?> rpcResult) {
584         assertFalse("RpcResult", rpcResult.isSuccessful());
585         assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
586         RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
587         assertNotNull("RpcResult error message null", error.getMessage());
588     }
589
590     @Test
591     public void testRemoveShardReplica() throws Exception {
592         String name = "testRemoveShardReplica";
593         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
594         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
595                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
596                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
597                 .build();
598
599         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
600                 .moduleShardsConfig(moduleShardsConfig).build();
601
602         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
603                 .moduleShardsConfig(moduleShardsConfig).build();
604
605         leaderNode1.configDataStore().waitTillReady();
606         replicaNode3.configDataStore().waitTillReady();
607         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
608         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
609         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
610
611         // Invoke RPC service on member-3 to remove it's local shard
612
613         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
614                 replicaNode3.operDataStore(), null);
615
616         RpcResult<RemoveShardReplicaOutput> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder()
617                 .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build())
618                 .get(10, TimeUnit.SECONDS);
619         verifySuccessfulRpcResult(rpcResult);
620
621         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
622         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
623         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
624
625         // Restart member-2 and verify member-3 isn't present.
626
627         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
628         replicaNode2.cleanup();
629
630         MemberNode newPeplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
631                 .moduleShardsConfig(moduleShardsConfig).build();
632
633         newPeplicaNode2.configDataStore().waitTillReady();
634         verifyRaftPeersPresent(newPeplicaNode2.configDataStore(), "cars", "member-1");
635
636         // Invoke RPC service on member-1 to remove member-2
637
638         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
639                 leaderNode1.operDataStore(), null);
640
641         rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars")
642                 .setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
643         verifySuccessfulRpcResult(rpcResult);
644
645         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
646         verifyNoShardPresent(newPeplicaNode2.configDataStore(), "cars");
647     }
648
649     @Test
650     public void testRemoveShardLeaderReplica() throws Exception {
651         String name = "testRemoveShardLeaderReplica";
652         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
653         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
654                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
655                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
656                 .build();
657
658         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
659                 .moduleShardsConfig(moduleShardsConfig).build();
660
661         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
662                 .moduleShardsConfig(moduleShardsConfig).build();
663
664         leaderNode1.configDataStore().waitTillReady();
665         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
666         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
667         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
668
669         replicaNode2.waitForMembersUp("member-1", "member-3");
670         replicaNode3.waitForMembersUp("member-1", "member-2");
671
672         // Invoke RPC service on leader member-1 to remove it's local shard
673
674         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
675                 leaderNode1.operDataStore(), null);
676
677         RpcResult<RemoveShardReplicaOutput> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
678                 .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build())
679                 .get(10, TimeUnit.SECONDS);
680         verifySuccessfulRpcResult(rpcResult);
681
682         verifyRaftState(replicaNode2.configDataStore(), "cars", raftState ->
683                 assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
684                         containsString("member-3"))));
685
686         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
687         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
688         verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
689     }
690
691     @Test
692     public void testAddReplicasForAllShards() throws Exception {
693         String name = "testAddReplicasForAllShards";
694         String moduleShardsConfig = "module-shards-member1.conf";
695         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
696                 .moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
697
698         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
699                                                                                  "pets", null,
700                                                                                  Collections.singletonList(MEMBER_1));
701         leaderNode1.configDataStore().getActorUtils().getShardManager().tell(
702                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
703         leaderNode1.kit().expectMsgClass(Success.class);
704         leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorUtils(), "pets");
705
706         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
707                 .moduleShardsConfig(moduleShardsConfig).build();
708
709         leaderNode1.waitForMembersUp("member-2");
710         newReplicaNode2.waitForMembersUp("member-1");
711
712         newReplicaNode2.configDataStore().getActorUtils().getShardManager().tell(
713                 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
714         newReplicaNode2.kit().expectMsgClass(Success.class);
715
716         newReplicaNode2.operDataStore().getActorUtils().getShardManager().tell(
717                 new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
718                                                              "no-leader", null,
719                                                              Collections.singletonList(MEMBER_1)),
720                                 Shard.builder(), null),
721                                 newReplicaNode2.kit().getRef());
722         newReplicaNode2.kit().expectMsgClass(Success.class);
723
724         ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
725                 newReplicaNode2.operDataStore(), null);
726
727         RpcResult<AddReplicasForAllShardsOutput> rpcResult = service.addReplicasForAllShards(
728             new AddReplicasForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
729         AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
730         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
731                 successShardResult("people", DataStoreType.Config),
732                 successShardResult("pets", DataStoreType.Config),
733                 successShardResult("cars", DataStoreType.Operational),
734                 successShardResult("people", DataStoreType.Operational),
735                 failedShardResult("no-leader", DataStoreType.Operational));
736
737         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
738         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
739         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
740         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
741         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
742     }
743
744     @Test
745     public void testRemoveAllShardReplicas() throws Exception {
746         String name = "testRemoveAllShardReplicas";
747         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
748         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
749                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
750                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
751                 .build();
752
753         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
754                 .moduleShardsConfig(moduleShardsConfig).build();
755
756         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
757                 .moduleShardsConfig(moduleShardsConfig).build();
758
759         leaderNode1.configDataStore().waitTillReady();
760         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
761         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
762         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
763
764         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
765                 "pets", null, Arrays.asList(MEMBER_1, MEMBER_2, MEMBER_3));
766         leaderNode1.configDataStore().getActorUtils().getShardManager().tell(
767                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
768         leaderNode1.kit().expectMsgClass(Success.class);
769
770         replicaNode2.configDataStore().getActorUtils().getShardManager().tell(
771                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef());
772         replicaNode2.kit().expectMsgClass(Success.class);
773
774         replicaNode3.configDataStore().getActorUtils().getShardManager().tell(
775                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef());
776         replicaNode3.kit().expectMsgClass(Success.class);
777
778         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3");
779         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
780         verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
781
782         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
783                 replicaNode3.operDataStore(), null);
784
785         RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
786                 new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
787         RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult);
788         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
789                 successShardResult("people", DataStoreType.Config),
790                 successShardResult("pets", DataStoreType.Config),
791                 successShardResult("cars", DataStoreType.Operational),
792                 successShardResult("people", DataStoreType.Operational));
793
794         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
795         verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2");
796         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2");
797         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
798         verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1");
799         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1");
800         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
801         verifyNoShardPresent(replicaNode3.configDataStore(), "people");
802         verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
803     }
804
805     @Test
806     public void testChangeMemberVotingStatesForShard() throws Exception {
807         String name = "testChangeMemberVotingStatusForShard";
808         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
809         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
810                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
811                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
812                 .build();
813
814         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
815                 .moduleShardsConfig(moduleShardsConfig).build();
816
817         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
818                 .moduleShardsConfig(moduleShardsConfig).build();
819
820         leaderNode1.configDataStore().waitTillReady();
821         replicaNode3.configDataStore().waitTillReady();
822         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
823         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
824         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
825
826         // Invoke RPC service on member-3 to change voting status
827
828         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
829                 replicaNode3.operDataStore(), null);
830
831         RpcResult<ChangeMemberVotingStatesForShardOutput> rpcResult = service3
832                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
833                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
834                         .setMemberVotingState(ImmutableList.of(
835                                 new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
836                                 new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build()))
837                         .build())
838                 .get(10, TimeUnit.SECONDS);
839         verifySuccessfulRpcResult(rpcResult);
840
841         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
842                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
843         verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
844                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
845         verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
846                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
847     }
848
849     @Test
850     public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
851         String name = "testChangeMemberVotingStatesForSingleNodeShard";
852         String moduleShardsConfig = "module-shards-member1.conf";
853         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
854                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
855                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
856                 .build();
857
858         leaderNode.configDataStore().waitTillReady();
859
860         // Invoke RPC service on member-3 to change voting status
861
862         ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
863                 leaderNode.operDataStore(), null);
864
865         RpcResult<ChangeMemberVotingStatesForShardOutput> rpcResult = service
866                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
867                         .setShardName("cars").setDataStoreType(DataStoreType.Config)
868                         .setMemberVotingState(ImmutableList
869                                 .of(new MemberVotingStateBuilder().setMemberName("member-1").setVoting(FALSE).build()))
870                         .build())
871                 .get(10, TimeUnit.SECONDS);
872         verifyFailedRpcResult(rpcResult);
873
874         verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE));
875     }
876
877     @Test
878     public void testChangeMemberVotingStatesForAllShards() throws Exception {
879         String name = "testChangeMemberVotingStatesForAllShards";
880         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
881         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
882                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
883                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
884                 .build();
885
886         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
887                 .moduleShardsConfig(moduleShardsConfig).build();
888
889         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
890                 .moduleShardsConfig(moduleShardsConfig).build();
891
892         leaderNode1.configDataStore().waitTillReady();
893         leaderNode1.operDataStore().waitTillReady();
894         replicaNode3.configDataStore().waitTillReady();
895         replicaNode3.operDataStore().waitTillReady();
896         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
897         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
898         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
899
900         // Invoke RPC service on member-3 to change voting status
901
902         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
903                 replicaNode3.operDataStore(), null);
904
905         RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
906                 new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
907                         new MemberVotingStateBuilder().setMemberName("member-2").setVoting(FALSE).build(),
908                         new MemberVotingStateBuilder().setMemberName("member-3").setVoting(FALSE).build())).build())
909                 .get(10, TimeUnit.SECONDS);
910         ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
911         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
912                 successShardResult("people", DataStoreType.Config),
913                 successShardResult("cars", DataStoreType.Operational),
914                 successShardResult("people", DataStoreType.Operational));
915
916         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
917                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
918                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
919                 new String[]{"cars", "people"}, new SimpleEntry<>("member-1", TRUE),
920                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE));
921     }
922
923     @Test
924     public void testFlipMemberVotingStates() throws Exception {
925         String name = "testFlipMemberVotingStates";
926
927         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
928                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
929                 new ServerInfo("member-3", false)));
930
931         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
932         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
933         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
934
935         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
936         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
937                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder()
938                         .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10))
939                 .build();
940
941         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
942                 .moduleShardsConfig(moduleShardsConfig).build();
943
944         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
945                 .moduleShardsConfig(moduleShardsConfig).build();
946
947         leaderNode1.configDataStore().waitTillReady();
948         leaderNode1.operDataStore().waitTillReady();
949         replicaNode3.configDataStore().waitTillReady();
950         replicaNode3.operDataStore().waitTillReady();
951         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
952                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE));
953
954         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
955                 replicaNode3.operDataStore(), null);
956
957         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards(
958             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
959         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
960         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
961                 successShardResult("people", DataStoreType.Config),
962                 successShardResult("cars", DataStoreType.Operational),
963                 successShardResult("people", DataStoreType.Operational));
964
965         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
966                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
967                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
968                 new String[]{"cars", "people"},
969                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
970                 new SimpleEntry<>("member-3", TRUE));
971
972         // Leadership should have transferred to member 3 since it is the only remaining voting member.
973         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
974             assertNotNull("Expected non-null leader Id", raftState.getLeader());
975             assertTrue("Expected leader member-3. Actual: " + raftState.getLeader(),
976                     raftState.getLeader().contains("member-3"));
977         });
978
979         verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
980             assertNotNull("Expected non-null leader Id", raftState.getLeader());
981             assertTrue("Expected leader member-3. Actual: " + raftState.getLeader(),
982                     raftState.getLeader().contains("member-3"));
983         });
984
985         // Flip the voting states back to the original states.
986
987         rpcResult = service3.flipMemberVotingStatesForAllShards(
988             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
989         result = verifySuccessfulRpcResult(rpcResult);
990         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
991                 successShardResult("people", DataStoreType.Config),
992                 successShardResult("cars", DataStoreType.Operational),
993                 successShardResult("people", DataStoreType.Operational));
994
995         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
996                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
997                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
998                 new String[]{"cars", "people"},
999                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
1000                 new SimpleEntry<>("member-3", FALSE));
1001
1002         // Leadership should have transferred to member 1 or 2.
1003         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
1004             assertNotNull("Expected non-null leader Id", raftState.getLeader());
1005             assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
1006                     raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
1007         });
1008     }
1009
1010     @Test
1011     public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
1012         String name = "testFlipMemberVotingStatesWithNoInitialLeader";
1013
1014         // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
1015         // non-voting and simulated as down by not starting them up.
1016         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1017                 new ServerInfo("member-1", false), new ServerInfo("member-2", false),
1018                 new ServerInfo("member-3", false), new ServerInfo("member-4", true),
1019                 new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
1020
1021         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
1022         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
1023         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
1024
1025         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
1026         final MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
1027                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
1028                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
1029                 .build();
1030
1031         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
1032                 .moduleShardsConfig(moduleShardsConfig).build();
1033
1034         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
1035                 .moduleShardsConfig(moduleShardsConfig).build();
1036
1037         // Initially there won't be a leader b/c all the up nodes are non-voting.
1038
1039         replicaNode1.waitForMembersUp("member-2", "member-3");
1040
1041         verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", FALSE),
1042                 new SimpleEntry<>("member-2", FALSE), new SimpleEntry<>("member-3", FALSE),
1043                 new SimpleEntry<>("member-4", TRUE), new SimpleEntry<>("member-5", TRUE),
1044                 new SimpleEntry<>("member-6", TRUE));
1045
1046         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
1047             assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
1048
1049         ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
1050                 replicaNode1.operDataStore(), null);
1051
1052         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards(
1053             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
1054         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
1055         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
1056                 successShardResult("people", DataStoreType.Config),
1057                 successShardResult("cars", DataStoreType.Operational),
1058                 successShardResult("people", DataStoreType.Operational));
1059
1060         verifyVotingStates(new AbstractDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
1061                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
1062                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
1063                 new String[]{"cars", "people"},
1064                 new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE),
1065                 new SimpleEntry<>("member-3", TRUE), new SimpleEntry<>("member-4", FALSE),
1066                 new SimpleEntry<>("member-5", FALSE), new SimpleEntry<>("member-6", FALSE));
1067
1068         // Since member 1 was changed to voting and there was no leader, it should've started and election
1069         // and become leader
1070         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
1071             assertNotNull("Expected non-null leader Id", raftState.getLeader());
1072             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
1073                     raftState.getLeader().contains("member-1"));
1074         });
1075
1076         verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
1077             assertNotNull("Expected non-null leader Id", raftState.getLeader());
1078             assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
1079                     raftState.getLeader().contains("member-1"));
1080         });
1081     }
1082
1083     @Test
1084     public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
1085         String name = "testFlipMemberVotingStatesWithVotingMembersDown";
1086
1087         // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
1088         ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
1089                 new ServerInfo("member-1", true), new ServerInfo("member-2", true),
1090                 new ServerInfo("member-3", true), new ServerInfo("member-4", false),
1091                 new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
1092
1093         setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
1094         setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
1095         setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
1096
1097         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
1098         final MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
1099                 .moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(
1100                         DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1))
1101                 .build();
1102
1103         final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
1104                 .moduleShardsConfig(moduleShardsConfig).build();
1105
1106         final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
1107                 .moduleShardsConfig(moduleShardsConfig).build();
1108
1109         leaderNode1.configDataStore().waitTillReady();
1110         leaderNode1.operDataStore().waitTillReady();
1111         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
1112                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", TRUE),
1113                 new SimpleEntry<>("member-4", FALSE), new SimpleEntry<>("member-5", FALSE),
1114                 new SimpleEntry<>("member-6", FALSE));
1115
1116         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
1117                 leaderNode1.operDataStore(), null);
1118
1119         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards(
1120             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
1121         FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
1122         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
1123                 successShardResult("people", DataStoreType.Config),
1124                 successShardResult("cars", DataStoreType.Operational),
1125                 successShardResult("people", DataStoreType.Operational));
1126
1127         // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
1128         verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
1129                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
1130                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
1131                 new String[]{"cars", "people"},
1132                 new SimpleEntry<>("member-1", FALSE), new SimpleEntry<>("member-2", FALSE),
1133                 new SimpleEntry<>("member-3", FALSE), new SimpleEntry<>("member-4", TRUE),
1134                 new SimpleEntry<>("member-5", TRUE), new SimpleEntry<>("member-6", TRUE));
1135
1136         // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
1137         // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
1138         verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
1139             assertNotNull("Expected non-null leader Id", raftState.getLeader());
1140             assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
1141         });
1142     }
1143
1144     private static void setupPersistedServerConfigPayload(final ServerConfigurationPayload serverConfig,
1145             final String member, final String datastoreTypeSuffix, final String... shards) {
1146         String[] datastoreTypes = {"config_", "oper_"};
1147         for (String type : datastoreTypes) {
1148             for (String shard : shards) {
1149                 List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
1150                 for (ServerInfo info : serverConfig.getServerConfig()) {
1151                     newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
1152                             type + datastoreTypeSuffix).toString(), info.isVoting()));
1153                 }
1154
1155                 String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
1156                         type + datastoreTypeSuffix).toString();
1157                 InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
1158                 InMemoryJournal.addEntry(shardID, 2, new SimpleReplicatedLogEntry(0, 1,
1159                         new ServerConfigurationPayload(newServerInfo)));
1160             }
1161         }
1162     }
1163
1164     @SafeVarargs
1165     private static void verifyVotingStates(final AbstractDataStore[] datastores, final String[] shards,
1166             final SimpleEntry<String, Boolean>... expStates) throws Exception {
1167         for (AbstractDataStore datastore: datastores) {
1168             for (String shard: shards) {
1169                 verifyVotingStates(datastore, shard, expStates);
1170             }
1171         }
1172     }
1173
1174     @SafeVarargs
1175     private static void verifyVotingStates(final AbstractDataStore datastore, final String shardName,
1176             final SimpleEntry<String, Boolean>... expStates) throws Exception {
1177         String localMemberName = datastore.getActorUtils().getCurrentMemberName().getName();
1178         Map<String, Boolean> expStateMap = new HashMap<>();
1179         for (Entry<String, Boolean> e: expStates) {
1180             expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
1181                     datastore.getActorUtils().getDataStoreName()).toString(), e.getValue());
1182         }
1183
1184         verifyRaftState(datastore, shardName, raftState -> {
1185             String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
1186                     datastore.getActorUtils().getDataStoreName()).toString();
1187             assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
1188             for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1189                 assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
1190             }
1191         });
1192     }
1193
1194     private static void verifyShardResults(final Map<ShardResultKey, ShardResult> shardResults,
1195             final ShardResult... expShardResults) {
1196         Map<String, ShardResult> expResultsMap = new HashMap<>();
1197         for (ShardResult r: expShardResults) {
1198             expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
1199         }
1200
1201         for (ShardResult result: shardResults.values()) {
1202             ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
1203             assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
1204                     result.getDataStoreType()), exp);
1205             assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded());
1206             if (exp.isSucceeded()) {
1207                 assertNull("Expected null error message", result.getErrorMessage());
1208             } else {
1209                 assertNotNull("Expected error message", result.getErrorMessage());
1210             }
1211         }
1212
1213         if (!expResultsMap.isEmpty()) {
1214             fail("Missing shard results for " + expResultsMap.keySet());
1215         }
1216     }
1217
1218     private static ShardResult successShardResult(final String shardName, final DataStoreType type) {
1219         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(TRUE).build();
1220     }
1221
1222     private static ShardResult failedShardResult(final String shardName, final DataStoreType type) {
1223         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(FALSE).build();
1224     }
1225 }