BUG-5280: use MemberName instead of String
[controller.git] / opendaylight / md-sal / sal-cluster-admin / src / test / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcServiceTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.admin;
9
10 import static org.hamcrest.CoreMatchers.anyOf;
11 import static org.hamcrest.CoreMatchers.containsString;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertThat;
16 import static org.junit.Assert.fail;
17 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
18 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
19 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
20 import akka.actor.ActorRef;
21 import akka.actor.PoisonPill;
22 import akka.actor.Status.Success;
23 import akka.cluster.Cluster;
24 import com.google.common.base.Optional;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.Iterables;
27 import com.google.common.collect.Sets;
28 import java.io.File;
29 import java.io.FileInputStream;
30 import java.net.URI;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.concurrent.TimeUnit;
39 import org.apache.commons.lang3.SerializationUtils;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.opendaylight.controller.cluster.access.concepts.MemberName;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
45 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
46 import org.opendaylight.controller.cluster.datastore.MemberNode;
47 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
48 import org.opendaylight.controller.cluster.datastore.Shard;
49 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
50 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
51 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
52 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
53 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
54 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
55 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
56 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
57 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
67 import org.opendaylight.yangtools.yang.common.RpcError;
68 import org.opendaylight.yangtools.yang.common.RpcResult;
69 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
70
71 /**
72  * Unit tests for ClusterAdminRpcService.
73  *
74  * @author Thomas Pantelis
75  */
76 public class ClusterAdminRpcServiceTest {
77     private static final MemberName MEMBER_1 = MemberName.forName("member-1");
78     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
79     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
80     private final List<MemberNode> memberNodes = new ArrayList<>();
81
82     @Before
83     public void setUp() {
84         InMemoryJournal.clear();
85         InMemorySnapshotStore.clear();
86     }
87
88     @After
89     public void tearDown() {
90         for(MemberNode m: memberNodes) {
91             m.cleanup();
92         }
93     }
94
95     @Test
96     public void testBackupDatastore() throws Exception {
97         MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1").
98                 moduleShardsConfig("module-shards-member1.conf").
99                 waitForShardLeader("cars", "people").testName("testBackupDatastore").build();
100
101         String fileName = "target/testBackupDatastore";
102         new File(fileName).delete();
103
104         ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore());
105
106         RpcResult<Void> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder().
107                 setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
108         verifySuccessfulRpcResult(rpcResult);
109
110         try(FileInputStream fis = new FileInputStream(fileName)) {
111             List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
112             assertEquals("DatastoreSnapshot size", 2, snapshots.size());
113
114             ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
115                     snapshots.get(1).getType(), snapshots.get(1));
116             verifyDatastoreSnapshot(node.configDataStore().getActorContext().getDataStoreName(),
117                     map.get(node.configDataStore().getActorContext().getDataStoreName()), "cars", "people");
118         } finally {
119             new File(fileName).delete();
120         }
121
122         // Test failure by killing a shard.
123
124         node.configDataStore().getActorContext().getShardManager().tell(node.datastoreContextBuilder().
125                 shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
126
127         ActorRef carsShardActor = node.configDataStore().getActorContext().findLocalShard("cars").get();
128         node.kit().watch(carsShardActor);
129         carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
130         node.kit().expectTerminated(carsShardActor);
131
132         rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build()).
133                 get(5, TimeUnit.SECONDS);
134         assertEquals("isSuccessful", false, rpcResult.isSuccessful());
135         assertEquals("getErrors", 1, rpcResult.getErrors().size());
136
137         service.close();
138     }
139
140     private static void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, String... expShardNames) {
141         assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
142         Set<String> shardNames = new HashSet<>();
143         for(DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
144             shardNames.add(s.getName());
145         }
146
147         assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
148     }
149
150     @Test
151     public void testAddShardReplica() throws Exception {
152         String name = "testAddShardReplica";
153         String moduleShardsConfig = "module-shards-cars-member-1.conf";
154         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
155                 moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
156
157         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
158                 moduleShardsConfig(moduleShardsConfig).build();
159
160         leaderNode1.waitForMembersUp("member-2");
161
162         doAddShardReplica(newReplicaNode2, "cars", "member-1");
163
164         MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
165                 moduleShardsConfig(moduleShardsConfig).build();
166
167         leaderNode1.waitForMembersUp("member-3");
168         newReplicaNode2.waitForMembersUp("member-3");
169
170         doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
171
172         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
173         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
174
175         // Write data to member-2's config datastore and read/verify via member-3
176         NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
177                 newReplicaNode3.configDataStore());
178
179         // Write data to member-3's oper datastore and read/verify via member-2
180         writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
181
182         // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 -
183         // 2 ServerConfigurationPayload entries and the transaction payload entry.
184
185         RaftStateVerifier verifier = new RaftStateVerifier() {
186             @Override
187             public void verify(OnDemandRaftState raftState) {
188                 assertEquals("Commit index", 2, raftState.getCommitIndex());
189                 assertEquals("Last applied index", 2, raftState.getLastApplied());
190             }
191         };
192
193         verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
194         verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
195
196         verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
197         verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
198
199         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
200         verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
201
202         // Restart member-3 and verify the cars config shard is re-instated.
203
204         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
205         newReplicaNode3.cleanup();
206
207         newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
208                 moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
209
210         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
211         readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
212     }
213
214     @Test
215     public void testAddShardReplicaFailures() throws Exception {
216         String name = "testAddShardReplicaFailures";
217         MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name).
218                 moduleShardsConfig("module-shards-cars-member-1.conf").build();
219
220         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
221                 memberNode.operDataStore());
222
223         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().
224                 setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
225         verifyFailedRpcResult(rpcResult);
226
227         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars").
228                 build()).get(10, TimeUnit.SECONDS);
229         verifyFailedRpcResult(rpcResult);
230
231         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people").
232                 setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
233         verifyFailedRpcResult(rpcResult);
234
235         service.close();
236     }
237
238     private static NormalizedNode<?, ?> writeCarsNodeAndVerify(DistributedDataStore writeToStore,
239             DistributedDataStore readFromStore) throws Exception {
240         DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
241         NormalizedNode<?, ?> carsNode = CarsModel.create();
242         writeTx.write(CarsModel.BASE_PATH, carsNode);
243
244         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
245         Boolean canCommit = cohort .canCommit().get(7, TimeUnit.SECONDS);
246         assertEquals("canCommit", true, canCommit);
247         cohort.preCommit().get(5, TimeUnit.SECONDS);
248         cohort.commit().get(5, TimeUnit.SECONDS);
249
250         readCarsNodeAndVerify(readFromStore, carsNode);
251         return carsNode;
252     }
253
254     private static void readCarsNodeAndVerify(DistributedDataStore readFromStore,
255             NormalizedNode<?, ?> expCarsNode) throws Exception {
256         Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction().
257                 read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
258         assertEquals("isPresent", true, optional.isPresent());
259         assertEquals("Data node", expCarsNode, optional.get());
260     }
261
262     private static void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames)
263             throws Exception {
264         memberNode.waitForMembersUp(peerMemberNames);
265
266         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
267                 memberNode.operDataStore());
268
269         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName).
270                 setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
271         verifySuccessfulRpcResult(rpcResult);
272
273         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
274
275         Optional<ActorRef> optional = memberNode.operDataStore().getActorContext().findLocalShard(shardName);
276         assertEquals("Oper shard present", false, optional.isPresent());
277
278         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName).
279                 setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
280         verifySuccessfulRpcResult(rpcResult);
281
282         verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
283
284         service.close();
285     }
286
287     private static <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
288         if(!rpcResult.isSuccessful()) {
289             if(rpcResult.getErrors().size() > 0) {
290                 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
291                 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
292             }
293
294             fail("Rpc failed with no error");
295         }
296
297         return rpcResult.getResult();
298     }
299
300     private static void verifyFailedRpcResult(RpcResult<Void> rpcResult) {
301         assertEquals("RpcResult", false, rpcResult.isSuccessful());
302         assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
303         RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
304         assertNotNull("RpcResult error message null", error.getMessage());
305     }
306
307     @Test
308     public void testRemoveShardReplica() throws Exception {
309         String name = "testRemoveShardReplicaLocal";
310         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
311         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
312                 moduleShardsConfig(moduleShardsConfig).
313                 datastoreContextBuilder(DatastoreContext.newBuilder().
314                         shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
315
316         MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
317                 moduleShardsConfig(moduleShardsConfig).build();
318
319         MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
320                 moduleShardsConfig(moduleShardsConfig).build();
321
322         leaderNode1.configDataStore().waitTillReady();
323         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
324         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
325         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
326
327         // Invoke RPC service on member-3 to remove it's local shard
328
329         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
330                 replicaNode3.operDataStore());
331
332         RpcResult<Void> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder().
333                 setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build()).
334                         get(10, TimeUnit.SECONDS);
335         verifySuccessfulRpcResult(rpcResult);
336         service3.close();
337
338         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
339         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
340         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
341
342         // Restart member-2 and verify member-3 isn't present.
343
344         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
345         replicaNode2.cleanup();
346
347         replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
348                 moduleShardsConfig(moduleShardsConfig).build();
349
350         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
351
352         // Invoke RPC service on member-1 to remove member-2
353
354         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
355                 leaderNode1.operDataStore());
356
357         rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().
358                 setShardName("cars").setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).
359                         get(10, TimeUnit.SECONDS);
360         verifySuccessfulRpcResult(rpcResult);
361         service1.close();
362
363         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
364         verifyNoShardPresent(replicaNode2.configDataStore(), "cars");
365     }
366
367     @Test
368     public void testRemoveShardLeaderReplica() throws Exception {
369         String name = "testRemoveShardLeaderReplica";
370         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
371         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
372                 moduleShardsConfig(moduleShardsConfig).
373                 datastoreContextBuilder(DatastoreContext.newBuilder().
374                         shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
375
376         MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
377                 moduleShardsConfig(moduleShardsConfig).build();
378
379         MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
380                 moduleShardsConfig(moduleShardsConfig).build();
381
382         leaderNode1.configDataStore().waitTillReady();
383         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
384         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
385         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
386
387         replicaNode2.waitForMembersUp("member-1", "member-3");
388         replicaNode2.waitForMembersUp("member-1", "member-2");
389
390         // Invoke RPC service on leader member-1 to remove it's local shard
391
392         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
393                 leaderNode1.operDataStore());
394
395         RpcResult<Void> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().
396                 setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()).
397                         get(10, TimeUnit.SECONDS);
398         verifySuccessfulRpcResult(rpcResult);
399         service1.close();
400
401         verifyRaftState(replicaNode2.configDataStore(), "cars", new RaftStateVerifier() {
402             @Override
403             public void verify(OnDemandRaftState raftState) {
404                 assertThat("Leader Id", raftState.getLeader(), anyOf(containsString("member-2"),
405                         containsString("member-3")));
406             }
407         });
408
409         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-3");
410         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-2");
411         verifyNoShardPresent(leaderNode1.configDataStore(), "cars");
412     }
413
414     @Test
415     public void testAddReplicasForAllShards() throws Exception {
416         String name = "testAddReplicasForAllShards";
417         String moduleShardsConfig = "module-shards-member1.conf";
418         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
419                 moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
420
421         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
422                 "pets", null, Arrays.asList(MEMBER_1));
423         leaderNode1.configDataStore().getActorContext().getShardManager().tell(
424                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
425         leaderNode1.kit().expectMsgClass(Success.class);
426         leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorContext(), "pets");
427
428         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
429                 moduleShardsConfig(moduleShardsConfig).build();
430
431         leaderNode1.waitForMembersUp("member-2");
432         newReplicaNode2.waitForMembersUp("member-1");
433
434         newReplicaNode2.configDataStore().getActorContext().getShardManager().tell(
435                 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
436         newReplicaNode2.kit().expectMsgClass(Success.class);
437
438         newReplicaNode2.operDataStore().getActorContext().getShardManager().tell(
439                 new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
440                         "no-leader", null, Arrays.asList(MEMBER_1)), Shard.builder(), null),
441                                 newReplicaNode2.kit().getRef());
442         newReplicaNode2.kit().expectMsgClass(Success.class);
443
444         ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
445                 newReplicaNode2.operDataStore());
446
447         RpcResult<AddReplicasForAllShardsOutput> rpcResult = service.addReplicasForAllShards().get(10, TimeUnit.SECONDS);
448         AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
449         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
450                 successShardResult("people", DataStoreType.Config),
451                 successShardResult("pets", DataStoreType.Config),
452                 successShardResult("cars", DataStoreType.Operational),
453                 successShardResult("people", DataStoreType.Operational),
454                 failedShardResult("no-leader", DataStoreType.Operational));
455
456         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
457         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
458         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
459         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
460         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
461
462         service.close();
463     }
464
465     @Test
466     public void testRemoveAllShardReplicas() throws Exception {
467         String name = "testRemoveAllShardReplicas";
468         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
469         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
470                 moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
471                         shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
472
473         MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
474                 moduleShardsConfig(moduleShardsConfig).build();
475
476         MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
477                 moduleShardsConfig(moduleShardsConfig).build();
478
479         leaderNode1.configDataStore().waitTillReady();
480         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
481         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
482         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
483
484         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
485                 "pets", null, Arrays.asList(MEMBER_1, MEMBER_2, MEMBER_3));
486         leaderNode1.configDataStore().getActorContext().getShardManager().tell(
487                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
488         leaderNode1.kit().expectMsgClass(Success.class);
489
490         replicaNode2.configDataStore().getActorContext().getShardManager().tell(
491                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode2.kit().getRef());
492         replicaNode2.kit().expectMsgClass(Success.class);
493
494         replicaNode3.configDataStore().getActorContext().getShardManager().tell(
495                 new CreateShard(petsModuleConfig, Shard.builder(), null), replicaNode3.kit().getRef());
496         replicaNode3.kit().expectMsgClass(Success.class);
497
498         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2", "member-3");
499         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
500         verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
501
502         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
503                 replicaNode3.operDataStore());
504
505         RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
506                 new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
507         RemoveAllShardReplicasOutput result = verifySuccessfulRpcResult(rpcResult);
508         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
509                 successShardResult("people", DataStoreType.Config),
510                 successShardResult("pets", DataStoreType.Config),
511                 successShardResult("cars", DataStoreType.Operational),
512                 successShardResult("people", DataStoreType.Operational));
513
514         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
515         verifyRaftPeersPresent(leaderNode1.configDataStore(), "people", "member-2");
516         verifyRaftPeersPresent(leaderNode1.configDataStore(), "pets", "member-2");
517         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
518         verifyRaftPeersPresent(replicaNode2.configDataStore(), "people", "member-1");
519         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1");
520         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
521         verifyNoShardPresent(replicaNode3.configDataStore(), "people");
522         verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
523
524         service3.close();
525     }
526
527     @Test
528     public void testConvertMembersToVotingForAllShards() {
529         // TODO implement
530     }
531
532     @Test
533     public void testConvertMembersToNonvotingForAllShards() {
534         // TODO implement
535     }
536
537     private static void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {
538         Map<String, ShardResult> expResultsMap = new HashMap<>();
539         for(ShardResult r: expShardResults) {
540             expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
541         }
542
543         for(ShardResult result: shardResults) {
544             ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
545             assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
546                     result.getDataStoreType()), exp);
547             assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded());
548             if(exp.isSucceeded()) {
549                 assertNull("Expected null error message", result.getErrorMessage());
550             } else {
551                 assertNotNull("Expected error message", result.getErrorMessage());
552             }
553         }
554
555         if(!expResultsMap.isEmpty()) {
556             fail("Missing shard results for " + expResultsMap.keySet());
557         }
558     }
559
560     private static ShardResult successShardResult(String shardName, DataStoreType type) {
561         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(true).build();
562     }
563
564     private static ShardResult failedShardResult(String shardName, DataStoreType type) {
565         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(false).build();
566     }
567 }