6e16dcfb097ddeffcf2fd0e740a00fd4e1546b7c
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcServiceTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.admin;
9
10 import static org.hamcrest.CoreMatchers.anyOf;
11 import static org.hamcrest.CoreMatchers.containsString;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertThat;
16 import static org.junit.Assert.fail;
17 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
18 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
19 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
20 import akka.actor.ActorRef;
21 import akka.actor.PoisonPill;
22 import akka.actor.Status.Success;
23 import akka.cluster.Cluster;
24 import com.google.common.base.Optional;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.Iterables;
27 import com.google.common.collect.Sets;
28 import java.io.File;
29 import java.io.FileInputStream;
30 import java.net.URI;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.concurrent.TimeUnit;
39 import org.apache.commons.lang3.SerializationUtils;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
44 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
45 import org.opendaylight.controller.cluster.datastore.MemberNode;
46 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
47 import org.opendaylight.controller.cluster.datastore.Shard;
48 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
49 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
50 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
51 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
52 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
53 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
54 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
55 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
56 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResult;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResultBuilder;
64 import org.opendaylight.yangtools.yang.common.RpcError;
65 import org.opendaylight.yangtools.yang.common.RpcResult;
66 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
67
68 /**
69  * Unit tests for ClusterAdminRpcService.
70  *
71  * @author Thomas Pantelis
72  */
73 public class ClusterAdminRpcServiceTest {
74     private final List<MemberNode> memberNodes = new ArrayList<>();
75
76     @Before
77     public void setUp() {
78         InMemoryJournal.clear();
79         InMemorySnapshotStore.clear();
80     }
81
82     @After
83     public void tearDown() {
84         for(MemberNode m: memberNodes) {
85             m.cleanup();
86         }
87     }
88
89     @Test
90     public void testBackupDatastore() throws Exception {
91         MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1").
92                 moduleShardsConfig("module-shards-member1.conf").
93                 waitForShardLeader("cars", "people").testName("testBackupDatastore").build();
94
95         String fileName = "target/testBackupDatastore";
96         new File(fileName).delete();
97
98         ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore());
99
100         RpcResult<Void> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder().
101                 setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
102         verifySuccessfulRpcResult(rpcResult);
103
104         try(FileInputStream fis = new FileInputStream(fileName)) {
105             List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
106             assertEquals("DatastoreSnapshot size", 2, snapshots.size());
107
108             ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
109                     snapshots.get(1).getType(), snapshots.get(1));
110             verifyDatastoreSnapshot(node.configDataStore().getActorContext().getDataStoreType(),
111                     map.get(node.configDataStore().getActorContext().getDataStoreType()), "cars", "people");
112         } finally {
113             new File(fileName).delete();
114         }
115
116         // Test failure by killing a shard.
117
118         node.configDataStore().getActorContext().getShardManager().tell(node.datastoreContextBuilder().
119                 shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
120
121         ActorRef carsShardActor = node.configDataStore().getActorContext().findLocalShard("cars").get();
122         node.kit().watch(carsShardActor);
123         carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
124         node.kit().expectTerminated(carsShardActor);
125
126         rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build()).
127                 get(5, TimeUnit.SECONDS);
128         assertEquals("isSuccessful", false, rpcResult.isSuccessful());
129         assertEquals("getErrors", 1, rpcResult.getErrors().size());
130
131         service.close();
132     }
133
134     private void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, String... expShardNames) {
135         assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
136         Set<String> shardNames = new HashSet<>();
137         for(DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
138             shardNames.add(s.getName());
139         }
140
141         assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
142     }
143
144     @Test
145     public void testAddShardReplica() throws Exception {
146         String name = "testAddShardReplica";
147         String moduleShardsConfig = "module-shards-cars-member-1.conf";
148         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
149                 moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
150
151         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
152                 moduleShardsConfig(moduleShardsConfig).build();
153
154         leaderNode1.waitForMembersUp("member-2");
155
156         doAddShardReplica(newReplicaNode2, "cars", "member-1");
157
158         MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
159                 moduleShardsConfig(moduleShardsConfig).build();
160
161         leaderNode1.waitForMembersUp("member-3");
162         newReplicaNode2.waitForMembersUp("member-3");
163
164         doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
165
166         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
167         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
168
169         // Write data to member-2's config datastore and read/verify via member-3
170         NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
171                 newReplicaNode3.configDataStore());
172
173         // Write data to member-3's oper datastore and read/verify via member-2
174         writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
175
176         // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 -
177         // 2 ServerConfigurationPayload entries and the transaction payload entry.
178
179         RaftStateVerifier verifier = new RaftStateVerifier() {
180             @Override
181             public void verify(OnDemandRaftState raftState) {
182                 assertEquals("Commit index", 2, raftState.getCommitIndex());
183                 assertEquals("Last applied index", 2, raftState.getLastApplied());
184             }
185         };
186
187         verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
188         verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
189
190         verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
191         verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
192
193         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
194         verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
195
196         // Restart member-3 and verify the cars config shard is re-instated.
197
198         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
199         newReplicaNode3.cleanup();
200
201         newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
202                 moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
203
204         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
205         readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
206     }
207
208     @Test
209     public void testAddShardReplicaFailures() throws Exception {
210         String name = "testAddShardReplicaFailures";
211         MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name).
212                 moduleShardsConfig("module-shards-cars-member-1.conf").build();
213
214         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
215                 memberNode.operDataStore());
216
217         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().
218                 setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
219         verifyFailedRpcResult(rpcResult);
220
221         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars").
222                 build()).get(10, TimeUnit.SECONDS);
223         verifyFailedRpcResult(rpcResult);
224
225         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people").
226                 setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
227         verifyFailedRpcResult(rpcResult);
228
229         service.close();
230     }
231
232     private NormalizedNode<?, ?> writeCarsNodeAndVerify(DistributedDataStore writeToStore,
233             DistributedDataStore readFromStore) throws Exception {
234         DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
235         NormalizedNode<?, ?> carsNode = CarsModel.create();
236         writeTx.write(CarsModel.BASE_PATH, carsNode);
237
238         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
239         Boolean canCommit = cohort .canCommit().get(7, TimeUnit.SECONDS);
240         assertEquals("canCommit", true, canCommit);
241         cohort.preCommit().get(5, TimeUnit.SECONDS);
242         cohort.commit().get(5, TimeUnit.SECONDS);
243
244         readCarsNodeAndVerify(readFromStore, carsNode);
245         return carsNode;
246     }
247
248     private void readCarsNodeAndVerify(DistributedDataStore readFromStore,
249             NormalizedNode<?, ?> expCarsNode) throws Exception {
250         Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction().
251                 read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
252         assertEquals("isPresent", true, optional.isPresent());
253         assertEquals("Data node", expCarsNode, optional.get());
254     }
255
256     private void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames)
257             throws Exception {
258         memberNode.waitForMembersUp(peerMemberNames);
259
260         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
261                 memberNode.operDataStore());
262
263         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName).
264                 setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
265         verifySuccessfulRpcResult(rpcResult);
266
267         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
268
269         Optional<ActorRef> optional = memberNode.operDataStore().getActorContext().findLocalShard(shardName);
270         assertEquals("Oper shard present", false, optional.isPresent());
271
272         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName).
273                 setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
274         verifySuccessfulRpcResult(rpcResult);
275
276         verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
277
278         service.close();
279     }
280
281     private <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
282         if(!rpcResult.isSuccessful()) {
283             if(rpcResult.getErrors().size() > 0) {
284                 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
285                 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
286             }
287
288             fail("Rpc failed with no error");
289         }
290
291         return rpcResult.getResult();
292     }
293
294     private void verifyFailedRpcResult(RpcResult<Void> rpcResult) {
295         assertEquals("RpcResult", false, rpcResult.isSuccessful());
296         assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
297         RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
298         assertNotNull("RpcResult error message null", error.getMessage());
299     }
300
301     @Test
302     public void testRemoveShardReplica() throws Exception {
303         String name = "testRemoveShardReplicaLocal";
304         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
305         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
306                 moduleShardsConfig(moduleShardsConfig).
307                 datastoreContextBuilder(DatastoreContext.newBuilder().
308                         shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
309
310         MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
311                 moduleShardsConfig(moduleShardsConfig).build();
312
313         MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
314                 moduleShardsConfig(moduleShardsConfig).build();
315
316         leaderNode1.configDataStore().waitTillReady();
317         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
318         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
319         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
320
321         // Invoke RPC service on member-3 to remove it's local shard
322
323         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
324                 replicaNode3.operDataStore());
325
326         RpcResult<Void> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder().
327                 setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build()).
328                         get(10, TimeUnit.SECONDS);
329         verifySuccessfulRpcResult(rpcResult);
330         service3.close();
331
332         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
333         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
334         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
335
336         // Restart member-2 and verify member-3 isn't present.
337
338         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
339         replicaNode2.cleanup();
340
341         replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
342                 moduleShardsConfig(moduleShardsConfig).build();
343
344         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
345
346         // Invoke RPC service on member-1 to remove member-2
347
348         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
349                 leaderNode1.operDataStore());
350
351         rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().
352                 setShardName("cars").setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).
353                         get(10, TimeUnit.SECONDS);
354         verifySuccessfulRpcResult(rpcResult);
355         service1.close();
356
357         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
358         verifyNoShardPresent(replicaNode2.configDataStore(), "cars");
359     }
360
361     @Test
362     public void testRemoveShardLeaderReplica() throws Exception {
363         String name = "testRemoveShardLeaderReplica";
364         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
365         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
366                 moduleShardsConfig(moduleShardsConfig).
367                 datastoreContextBuilder(DatastoreContext.newBuilder().
368                         shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
369
370         MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
371                 moduleShardsConfig(moduleShardsConfig).build();
372
373         MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
374                 moduleShardsConfig(moduleShardsConfig).build();
375
376         leaderNode1.configDataStore().waitTillReady();
377         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
378         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
379         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
380
381         replicaNode2.waitForMembersUp("member-1", "member-3");
382         replicaNode2.waitForMembersUp("member-1", "member-2");
383
384         // Invoke RPC service on leader member-1 to remove it's local shard
385
386         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
387                 leaderNode1.operDataStore());
388
389         RpcResult<Void> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().
390                 setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()).
391                         get(10, TimeUnit.SECONDS);
392         verifySuccessfulRpcResult(rpcResult);
393         service1.close();
394
395         verifyRaftState(replicaNode2.configDataStore(), "cars", new RaftStateVerifier() {
396             @Override
397             public void verify(OnDemandRaftState raftState) {
398                 assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
399                         containsString("member-3")));
400             }
401         });
402
403         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
404         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
405         verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
406     }
407
408     @Test
409     public void testAddReplicasForAllShards() throws Exception {
410         String name = "testAddReplicasForAllShards";
411         String moduleShardsConfig = "module-shards-member1.conf";
412         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
413                 moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
414
415         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
416                 "pets", null, Arrays.asList("member-1"));
417         leaderNode1.configDataStore().getActorContext().getShardManager().tell(
418                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
419         leaderNode1.kit().expectMsgClass(Success.class);
420         leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorContext(), "pets");
421
422         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
423                 moduleShardsConfig(moduleShardsConfig).build();
424
425         leaderNode1.waitForMembersUp("member-2");
426         newReplicaNode2.waitForMembersUp("member-1");
427
428         newReplicaNode2.configDataStore().getActorContext().getShardManager().tell(
429                 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
430         newReplicaNode2.kit().expectMsgClass(Success.class);
431
432         newReplicaNode2.operDataStore().getActorContext().getShardManager().tell(
433                 new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
434                         "no-leader", null, Arrays.asList("member-1")), Shard.builder(), null),
435                                 newReplicaNode2.kit().getRef());
436         newReplicaNode2.kit().expectMsgClass(Success.class);
437
438         ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
439                 newReplicaNode2.operDataStore());
440
441         RpcResult<AddReplicasForAllShardsOutput> rpcResult = service.addReplicasForAllShards().get(10, TimeUnit.SECONDS);
442         AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
443         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
444                 successShardResult("people", DataStoreType.Config),
445                 successShardResult("pets", DataStoreType.Config),
446                 successShardResult("cars", DataStoreType.Operational),
447                 successShardResult("people", DataStoreType.Operational),
448                 failedShardResult("no-leader", DataStoreType.Operational));
449
450         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
451         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
452         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
453         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
454         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
455
456         service.close();
457     }
458
459     @Test
460     public void testRemoveAllShardReplicas() {
461         // TODO implement
462     }
463
464     @Test
465     public void testConvertMembersToVotingForAllShards() {
466         // TODO implement
467     }
468
469     @Test
470     public void testConvertMembersToNonvotingForAllShards() {
471         // TODO implement
472     }
473
474     private void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {
475         Map<String, ShardResult> expResultsMap = new HashMap<>();
476         for(ShardResult r: expShardResults) {
477             expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
478         }
479
480         for(ShardResult result: shardResults) {
481             ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
482             assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
483                     result.getDataStoreType()), exp);
484             assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded());
485             if(exp.isSucceeded()) {
486                 assertNull("Expected null error message", result.getErrorMessage());
487             } else {
488                 assertNotNull("Expected error message", result.getErrorMessage());
489             }
490         }
491
492         if(!expResultsMap.isEmpty()) {
493             fail("Missing shard results for " + expResultsMap.keySet());
494         }
495     }
496
497     private ShardResult successShardResult(String shardName, DataStoreType type) {
498         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(true).build();
499     }
500
501     private ShardResult failedShardResult(String shardName, DataStoreType type) {
502         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(false).build();
503     }
504 }