BUG 2187 Implement Remove Shard Replica RPC
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcServiceTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.admin;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertNull;
13 import static org.junit.Assert.fail;
14 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
15 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
16 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
17 import akka.actor.ActorRef;
18 import akka.actor.PoisonPill;
19 import akka.actor.Status.Success;
20 import akka.cluster.Cluster;
21 import com.google.common.base.Optional;
22 import com.google.common.collect.ImmutableMap;
23 import com.google.common.collect.Iterables;
24 import com.google.common.collect.Sets;
25 import java.io.File;
26 import java.io.FileInputStream;
27 import java.net.URI;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.TimeUnit;
36 import org.apache.commons.lang3.SerializationUtils;
37 import org.junit.After;
38 import org.junit.Before;
39 import org.junit.Test;
40 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
41 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
42 import org.opendaylight.controller.cluster.datastore.MemberNode;
43 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
44 import org.opendaylight.controller.cluster.datastore.Shard;
45 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
46 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
47 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
48 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
49 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
50 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
51 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
52 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
53 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResult;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResultBuilder;
61 import org.opendaylight.yangtools.yang.common.RpcError;
62 import org.opendaylight.yangtools.yang.common.RpcResult;
63 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
64
65 /**
66  * Unit tests for ClusterAdminRpcService.
67  *
68  * @author Thomas Pantelis
69  */
70 public class ClusterAdminRpcServiceTest {
71     private final List<MemberNode> memberNodes = new ArrayList<>();
72
73     @Before
74     public void setUp() {
75         InMemoryJournal.clear();
76         InMemorySnapshotStore.clear();
77     }
78
79     @After
80     public void tearDown() {
81         for(MemberNode m: memberNodes) {
82             m.cleanup();
83         }
84     }
85
86     @Test
87     public void testBackupDatastore() throws Exception {
88         MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1").
89                 moduleShardsConfig("module-shards-member1.conf").
90                 waitForShardLeader("cars", "people").testName("testBackupDatastore").build();
91
92         String fileName = "target/testBackupDatastore";
93         new File(fileName).delete();
94
95         ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore());
96
97         RpcResult<Void> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder().
98                 setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
99         verifySuccessfulRpcResult(rpcResult);
100
101         try(FileInputStream fis = new FileInputStream(fileName)) {
102             List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
103             assertEquals("DatastoreSnapshot size", 2, snapshots.size());
104
105             ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
106                     snapshots.get(1).getType(), snapshots.get(1));
107             verifyDatastoreSnapshot(node.configDataStore().getActorContext().getDataStoreType(),
108                     map.get(node.configDataStore().getActorContext().getDataStoreType()), "cars", "people");
109         } finally {
110             new File(fileName).delete();
111         }
112
113         // Test failure by killing a shard.
114
115         node.configDataStore().getActorContext().getShardManager().tell(node.datastoreContextBuilder().
116                 shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
117
118         ActorRef carsShardActor = node.configDataStore().getActorContext().findLocalShard("cars").get();
119         node.kit().watch(carsShardActor);
120         carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
121         node.kit().expectTerminated(carsShardActor);
122
123         rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build()).
124                 get(5, TimeUnit.SECONDS);
125         assertEquals("isSuccessful", false, rpcResult.isSuccessful());
126         assertEquals("getErrors", 1, rpcResult.getErrors().size());
127
128         service.close();
129     }
130
131     private void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, String... expShardNames) {
132         assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
133         Set<String> shardNames = new HashSet<>();
134         for(DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
135             shardNames.add(s.getName());
136         }
137
138         assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
139     }
140
141     @Test
142     public void testAddShardReplica() throws Exception {
143         String name = "testAddShardReplica";
144         String moduleShardsConfig = "module-shards-cars-member-1.conf";
145         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
146                 moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
147
148         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
149                 moduleShardsConfig(moduleShardsConfig).build();
150
151         leaderNode1.waitForMembersUp("member-2");
152
153         doAddShardReplica(newReplicaNode2, "cars", "member-1");
154
155         MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
156                 moduleShardsConfig(moduleShardsConfig).build();
157
158         leaderNode1.waitForMembersUp("member-3");
159         newReplicaNode2.waitForMembersUp("member-3");
160
161         doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
162
163         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
164         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
165
166         // Write data to member-2's config datastore and read/verify via member-3
167         NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
168                 newReplicaNode3.configDataStore());
169
170         // Write data to member-3's oper datastore and read/verify via member-2
171         writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
172
173         // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 -
174         // 2 ServerConfigurationPayload entries and the transaction payload entry.
175
176         RaftStateVerifier verifier = new RaftStateVerifier() {
177             @Override
178             public void verify(OnDemandRaftState raftState) {
179                 assertEquals("Commit index", 2, raftState.getCommitIndex());
180                 assertEquals("Last applied index", 2, raftState.getLastApplied());
181             }
182         };
183
184         verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
185         verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
186
187         verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
188         verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
189
190         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
191         verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
192
193         // Restart member-3 and verify the cars config shard is re-instated.
194
195         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
196         newReplicaNode3.cleanup();
197
198         newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
199                 moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
200
201         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
202         readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
203     }
204
205     @Test
206     public void testAddShardReplicaFailures() throws Exception {
207         String name = "testAddShardReplicaFailures";
208         MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name).
209                 moduleShardsConfig("module-shards-cars-member-1.conf").build();
210
211         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
212                 memberNode.operDataStore());
213
214         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().
215                 setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
216         verifyFailedRpcResult(rpcResult);
217
218         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars").
219                 build()).get(10, TimeUnit.SECONDS);
220         verifyFailedRpcResult(rpcResult);
221
222         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people").
223                 setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
224         verifyFailedRpcResult(rpcResult);
225
226         service.close();
227     }
228
229     private NormalizedNode<?, ?> writeCarsNodeAndVerify(DistributedDataStore writeToStore,
230             DistributedDataStore readFromStore) throws Exception {
231         DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
232         NormalizedNode<?, ?> carsNode = CarsModel.create();
233         writeTx.write(CarsModel.BASE_PATH, carsNode);
234
235         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
236         Boolean canCommit = cohort .canCommit().get(7, TimeUnit.SECONDS);
237         assertEquals("canCommit", true, canCommit);
238         cohort.preCommit().get(5, TimeUnit.SECONDS);
239         cohort.commit().get(5, TimeUnit.SECONDS);
240
241         readCarsNodeAndVerify(readFromStore, carsNode);
242         return carsNode;
243     }
244
245     private void readCarsNodeAndVerify(DistributedDataStore readFromStore,
246             NormalizedNode<?, ?> expCarsNode) throws Exception {
247         Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction().
248                 read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
249         assertEquals("isPresent", true, optional.isPresent());
250         assertEquals("Data node", expCarsNode, optional.get());
251     }
252
253     private void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames)
254             throws Exception {
255         memberNode.waitForMembersUp(peerMemberNames);
256
257         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
258                 memberNode.operDataStore());
259
260         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName).
261                 setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
262         verifySuccessfulRpcResult(rpcResult);
263
264         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
265
266         Optional<ActorRef> optional = memberNode.operDataStore().getActorContext().findLocalShard(shardName);
267         assertEquals("Oper shard present", false, optional.isPresent());
268
269         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName).
270                 setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
271         verifySuccessfulRpcResult(rpcResult);
272
273         verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
274
275         service.close();
276     }
277
278     private <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
279         if(!rpcResult.isSuccessful()) {
280             if(rpcResult.getErrors().size() > 0) {
281                 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
282                 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
283             }
284
285             fail("Rpc failed with no error");
286         }
287
288         return rpcResult.getResult();
289     }
290
291     private void verifyFailedRpcResult(RpcResult<Void> rpcResult) {
292         assertEquals("RpcResult", false, rpcResult.isSuccessful());
293         assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
294         RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
295         assertNotNull("RpcResult error message null", error.getMessage());
296     }
297
298     @Test
299     public void testRemoveShardReplica() throws Exception {
300         String name = "testRemoveShardReplicaLocal";
301         String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
302         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
303                 moduleShardsConfig(moduleShardsConfig).
304                 datastoreContextBuilder(DatastoreContext.newBuilder().
305                         shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
306
307         MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
308                 moduleShardsConfig(moduleShardsConfig).build();
309
310         MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
311                 moduleShardsConfig(moduleShardsConfig).build();
312
313         leaderNode1.configDataStore().waitTillReady();
314         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
315         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
316         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
317
318         // Invoke RPC service on member-3 to remove it's local shard
319
320         ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
321                 replicaNode3.operDataStore());
322
323         RpcResult<Void> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder().
324                 setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build()).
325                         get(10, TimeUnit.SECONDS);
326         verifySuccessfulRpcResult(rpcResult);
327         service3.close();
328
329         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
330         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
331         verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
332
333         // Restart member-2 and verify member-3 isn't present.
334
335         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
336         replicaNode2.cleanup();
337
338         replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
339                 moduleShardsConfig(moduleShardsConfig).build();
340
341         verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
342
343         // Invoke RPC service on member-1 to remove member-2
344
345         ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
346                 leaderNode1.operDataStore());
347
348         rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().
349                 setShardName("cars").setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).
350                         get(10, TimeUnit.SECONDS);
351         verifySuccessfulRpcResult(rpcResult);
352         service1.close();
353
354         verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
355         verifyNoShardPresent(replicaNode2.configDataStore(), "cars");
356     }
357
358     @Test
359     public void testAddReplicasForAllShards() throws Exception {
360         String name = "testAddReplicasForAllShards";
361         String moduleShardsConfig = "module-shards-member1.conf";
362         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
363                 moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars", "people").build();
364
365         ModuleShardConfiguration petsModuleConfig = new ModuleShardConfiguration(URI.create("pets-ns"), "pets-module",
366                 "pets", null, Arrays.asList("member-1"));
367         leaderNode1.configDataStore().getActorContext().getShardManager().tell(
368                 new CreateShard(petsModuleConfig, Shard.builder(), null), leaderNode1.kit().getRef());
369         leaderNode1.kit().expectMsgClass(Success.class);
370         leaderNode1.kit().waitUntilLeader(leaderNode1.configDataStore().getActorContext(), "pets");
371
372         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
373                 moduleShardsConfig(moduleShardsConfig).build();
374
375         leaderNode1.waitForMembersUp("member-2");
376         newReplicaNode2.waitForMembersUp("member-1");
377
378         newReplicaNode2.configDataStore().getActorContext().getShardManager().tell(
379                 new CreateShard(petsModuleConfig, Shard.builder(), null), newReplicaNode2.kit().getRef());
380         newReplicaNode2.kit().expectMsgClass(Success.class);
381
382         newReplicaNode2.operDataStore().getActorContext().getShardManager().tell(
383                 new CreateShard(new ModuleShardConfiguration(URI.create("no-leader-ns"), "no-leader-module",
384                         "no-leader", null, Arrays.asList("member-1")), Shard.builder(), null),
385                                 newReplicaNode2.kit().getRef());
386         newReplicaNode2.kit().expectMsgClass(Success.class);
387
388         ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
389                 newReplicaNode2.operDataStore());
390
391         RpcResult<AddReplicasForAllShardsOutput> rpcResult = service.addReplicasForAllShards().get(10, TimeUnit.SECONDS);
392         AddReplicasForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
393         verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
394                 successShardResult("people", DataStoreType.Config),
395                 successShardResult("pets", DataStoreType.Config),
396                 successShardResult("cars", DataStoreType.Operational),
397                 successShardResult("people", DataStoreType.Operational),
398                 failedShardResult("no-leader", DataStoreType.Operational));
399
400         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1");
401         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "people", "member-1");
402         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
403         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
404         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
405
406         service.close();
407     }
408
409     @Test
410     public void testRemoveAllShardReplicas() {
411         // TODO implement
412     }
413
414     @Test
415     public void testConvertMembersToVotingForAllShards() {
416         // TODO implement
417     }
418
419     @Test
420     public void testConvertMembersToNonvotingForAllShards() {
421         // TODO implement
422     }
423
424     private void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {
425         Map<String, ShardResult> expResultsMap = new HashMap<>();
426         for(ShardResult r: expShardResults) {
427             expResultsMap.put(r.getShardName() + "-" + r.getDataStoreType(), r);
428         }
429
430         for(ShardResult result: shardResults) {
431             ShardResult exp = expResultsMap.remove(result.getShardName() + "-" + result.getDataStoreType());
432             assertNotNull(String.format("Unexpected result for shard %s, type %s", result.getShardName(),
433                     result.getDataStoreType()), exp);
434             assertEquals("isSucceeded", exp.isSucceeded(), result.isSucceeded());
435             if(exp.isSucceeded()) {
436                 assertNull("Expected null error message", result.getErrorMessage());
437             } else {
438                 assertNotNull("Expected error message", result.getErrorMessage());
439             }
440         }
441
442         if(!expResultsMap.isEmpty()) {
443             fail("Missing shard results for " + expResultsMap.keySet());
444         }
445     }
446
447     private ShardResult successShardResult(String shardName, DataStoreType type) {
448         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(true).build();
449     }
450
451     private ShardResult failedShardResult(String shardName, DataStoreType type) {
452         return new ShardResultBuilder().setDataStoreType(type).setShardName(shardName).setSucceeded(false).build();
453     }
454 }