765d8a1033e59c75e500b3041cb55d311995b709
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcServiceTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.admin;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.fail;
13 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
14 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
15 import akka.actor.ActorRef;
16 import akka.actor.PoisonPill;
17 import akka.cluster.Cluster;
18 import com.google.common.base.Optional;
19 import com.google.common.collect.ImmutableMap;
20 import com.google.common.collect.Iterables;
21 import com.google.common.collect.Sets;
22 import java.io.File;
23 import java.io.FileInputStream;
24 import java.util.ArrayList;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Set;
28 import java.util.concurrent.TimeUnit;
29 import org.apache.commons.lang3.SerializationUtils;
30 import org.junit.After;
31 import org.junit.Before;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
34 import org.opendaylight.controller.cluster.datastore.MemberNode;
35 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
36 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
37 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
38 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
39 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
40 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
41 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
46 import org.opendaylight.yangtools.yang.common.RpcError;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
49
50 /**
51  * Unit tests for ClusterAdminRpcService.
52  *
53  * @author Thomas Pantelis
54  */
55 public class ClusterAdminRpcServiceTest {
56     private final List<MemberNode> memberNodes = new ArrayList<>();
57
58     @Before
59     public void setUp() {
60         InMemoryJournal.clear();
61         InMemorySnapshotStore.clear();
62     }
63
64     @After
65     public void tearDown() {
66         for(MemberNode m: memberNodes) {
67             m.cleanup();
68         }
69     }
70
71     @Test
72     public void testBackupDatastore() throws Exception {
73         MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1").
74                 moduleShardsConfig("module-shards-member1.conf").
75                 waitForShardLeader("cars", "people").testName("testBackupDatastore").build();
76
77         String fileName = "target/testBackupDatastore";
78         new File(fileName).delete();
79
80         ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore());
81
82         RpcResult<Void> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder().
83                 setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
84         verifySuccessfulRpcResult(rpcResult);
85
86         try(FileInputStream fis = new FileInputStream(fileName)) {
87             List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
88             assertEquals("DatastoreSnapshot size", 2, snapshots.size());
89
90             ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
91                     snapshots.get(1).getType(), snapshots.get(1));
92             verifyDatastoreSnapshot(node.configDataStore().getActorContext().getDataStoreType(),
93                     map.get(node.configDataStore().getActorContext().getDataStoreType()), "cars", "people");
94         } finally {
95             new File(fileName).delete();
96         }
97
98         // Test failure by killing a shard.
99
100         node.configDataStore().getActorContext().getShardManager().tell(node.datastoreContextBuilder().
101                 shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
102
103         ActorRef carsShardActor = node.configDataStore().getActorContext().findLocalShard("cars").get();
104         node.kit().watch(carsShardActor);
105         carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
106         node.kit().expectTerminated(carsShardActor);
107
108         rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build()).
109                 get(5, TimeUnit.SECONDS);
110         assertEquals("isSuccessful", false, rpcResult.isSuccessful());
111         assertEquals("getErrors", 1, rpcResult.getErrors().size());
112
113         service.close();
114     }
115
116     private void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, String... expShardNames) {
117         assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
118         Set<String> shardNames = new HashSet<>();
119         for(DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
120             shardNames.add(s.getName());
121         }
122
123         assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
124     }
125
126     @Test
127     public void testAddShardReplica() throws Exception {
128         String name = "testAddShardReplica";
129         String moduleShardsConfig = "module-shards-cars-member-1.conf";
130         MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
131                 moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build();
132
133         MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
134                 moduleShardsConfig(moduleShardsConfig).build();
135
136         leaderNode1.waitForMembersUp("member-2");
137
138         doAddShardReplica(newReplicaNode2, "cars", "member-1");
139
140         MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
141                 moduleShardsConfig(moduleShardsConfig).build();
142
143         leaderNode1.waitForMembersUp("member-3");
144         newReplicaNode2.waitForMembersUp("member-3");
145
146         doAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
147
148         verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
149         verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
150
151         // Write data to member-2's config datastore and read/verify via member-3
152         NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
153                 newReplicaNode3.configDataStore());
154
155         // Write data to member-3's oper datastore and read/verify via member-2
156         writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
157
158         // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 -
159         // 2 ServerConfigurationPayload entries and the transaction payload entry.
160
161         RaftStateVerifier verifier = new RaftStateVerifier() {
162             @Override
163             public void verify(OnDemandRaftState raftState) {
164                 assertEquals("Commit index", 2, raftState.getCommitIndex());
165                 assertEquals("Last applied index", 2, raftState.getLastApplied());
166             }
167         };
168
169         verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
170         verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
171
172         verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
173         verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
174
175         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
176         verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
177
178         // Restart member-3 and verify the cars config shard is re-instated.
179
180         Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
181         newReplicaNode3.cleanup();
182
183         newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
184                 moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
185
186         verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
187         readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
188     }
189
190     @Test
191     public void testAddShardReplicaFailures() throws Exception {
192         String name = "testAddShardReplicaFailures";
193         MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name).
194                 moduleShardsConfig("module-shards-cars-member-1.conf").build();
195
196         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
197                 memberNode.operDataStore());
198
199         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().
200                 setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
201         verifyFailedRpcResult(rpcResult);
202
203         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("cars").
204                 build()).get(10, TimeUnit.SECONDS);
205         verifyFailedRpcResult(rpcResult);
206
207         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people").
208                 setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
209         verifyFailedRpcResult(rpcResult);
210
211         service.close();
212     }
213
214     private NormalizedNode<?, ?> writeCarsNodeAndVerify(DistributedDataStore writeToStore,
215             DistributedDataStore readFromStore) throws Exception {
216         DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
217         NormalizedNode<?, ?> carsNode = CarsModel.create();
218         writeTx.write(CarsModel.BASE_PATH, carsNode);
219
220         DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
221         Boolean canCommit = cohort .canCommit().get(7, TimeUnit.SECONDS);
222         assertEquals("canCommit", true, canCommit);
223         cohort.preCommit().get(5, TimeUnit.SECONDS);
224         cohort.commit().get(5, TimeUnit.SECONDS);
225
226         readCarsNodeAndVerify(readFromStore, carsNode);
227         return carsNode;
228     }
229
230     private void readCarsNodeAndVerify(DistributedDataStore readFromStore,
231             NormalizedNode<?, ?> expCarsNode) throws Exception {
232         Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction().
233                 read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
234         assertEquals("isPresent", true, optional.isPresent());
235         assertEquals("Data node", expCarsNode, optional.get());
236     }
237
238     private void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames)
239             throws Exception {
240         memberNode.waitForMembersUp(peerMemberNames);
241
242         ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
243                 memberNode.operDataStore());
244
245         RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName).
246                 setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
247         verifySuccessfulRpcResult(rpcResult);
248
249         verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
250
251         Optional<ActorRef> optional = memberNode.operDataStore().getActorContext().findLocalShard(shardName);
252         assertEquals("Oper shard present", false, optional.isPresent());
253
254         rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName).
255                 setDataStoreType(DataStoreType.Operational).build()).get(10, TimeUnit.SECONDS);
256         verifySuccessfulRpcResult(rpcResult);
257
258         verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
259
260         service.close();
261     }
262
263     private void verifySuccessfulRpcResult(RpcResult<Void> rpcResult) {
264         if(!rpcResult.isSuccessful()) {
265             if(rpcResult.getErrors().size() > 0) {
266                 RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
267                 throw new AssertionError("Rpc failed with error: " + error, error.getCause());
268             }
269
270             fail("Rpc failed with no error");
271         }
272     }
273
274     private void verifyFailedRpcResult(RpcResult<Void> rpcResult) {
275         assertEquals("RpcResult", false, rpcResult.isSuccessful());
276         assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
277         RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
278         assertNotNull("RpcResult error message null", error.getMessage());
279     }
280
281     @Test
282     public void testRemoveShardReplica() {
283         // TODO implement
284     }
285
286     @Test
287     public void testAddReplicasForAllShards() {
288         // TODO implement
289     }
290
291     @Test
292     public void testRemoveAllShardReplicas() {
293         // TODO implement
294     }
295
296     @Test
297     public void testConvertMembersToVotingForAllShards() {
298         // TODO implement
299     }
300
301     @Test
302     public void testConvertMembersToNonvotingForAllShards() {
303         // TODO implement
304     }
305 }