Bug 3194: Dynamically update PrimaryShardInfo cache when leader changes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.AddressFromURIString;
16 import akka.cluster.Cluster;
17 import akka.testkit.JavaTestKit;
18 import com.google.common.base.Optional;
19 import com.typesafe.config.ConfigFactory;
20 import java.math.BigInteger;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Before;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
27 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
28 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
30 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
31 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
32 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
33 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
45 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
46 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
47 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
48
49 /**
50  * End-to-end distributed data store tests that exercise remote shards and transactions.
51  *
52  * @author Thomas Pantelis
53  */
54 public class DistributedDataStoreRemotingIntegrationTest {
55
56     private static final String[] SHARD_NAMES = {"cars", "people"};
57
58     private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
59     private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2559");
60
61     private static final String MODULE_SHARDS_CONFIG = "module-shards-member1-and-2.conf";
62
63     private ActorSystem leaderSystem;
64     private ActorSystem followerSystem;
65
66     private final DatastoreContext.Builder leaderDatastoreContextBuilder =
67             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1);
68
69     private final DatastoreContext.Builder followerDatastoreContextBuilder =
70             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
71
72     private DistributedDataStore followerDistributedDataStore;
73     private DistributedDataStore leaderDistributedDataStore;
74     private IntegrationTestKit followerTestKit;
75     private IntegrationTestKit leaderTestKit;
76
77     @Before
78     public void setUpClass() {
79         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
80         Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
81
82         followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
83         Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
84     }
85
86     @After
87     public void tearDownClass() {
88         JavaTestKit.shutdownActorSystem(leaderSystem);
89         JavaTestKit.shutdownActorSystem(followerSystem);
90     }
91
92     private void initDatastores(String type) {
93         leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
94
95         followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
96         followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
97
98         leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
99
100         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), SHARD_NAMES);
101     }
102
103     private void verifyCars(DOMStoreReadTransaction readTx, MapEntryNode... entries) throws Exception {
104         Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
105         assertEquals("isPresent", true, optional.isPresent());
106
107         CollectionNodeBuilder<MapEntryNode, MapNode> listBuilder = ImmutableNodes.mapNodeBuilder(CarsModel.CAR_QNAME);
108         for(NormalizedNode<?, ?> entry: entries) {
109             listBuilder.withChild((MapEntryNode) entry);
110         }
111
112         assertEquals("Car list node", listBuilder.build(), optional.get());
113     }
114
115     private void verifyNode(DOMStoreReadTransaction readTx, YangInstanceIdentifier path, NormalizedNode<?, ?> expNode)
116             throws Exception {
117         Optional<NormalizedNode<?, ?>> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
118         assertEquals("isPresent", true, optional.isPresent());
119         assertEquals("Data node", expNode, optional.get());
120     }
121
122     private void verifyExists(DOMStoreReadTransaction readTx, YangInstanceIdentifier path) throws Exception {
123         Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS);
124         assertEquals("exists", true, exists);
125     }
126
127     @Test
128     public void testWriteTransactionWithSingleShard() throws Exception {
129         String testName = "testWriteTransactionWithSingleShard";
130         initDatastores(testName);
131
132         String followerCarShardName = "member-2-shard-cars-" + testName;
133         InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class );
134
135         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
136         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
137
138         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
139         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
140
141         MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
142         YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
143         writeTx.merge(car1Path, car1);
144
145         MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
146         YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
147         writeTx.merge(car2Path, car2);
148
149         followerTestKit.doCommit(writeTx.ready());
150
151         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
152
153         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
154
155         // Test delete
156
157         writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
158
159         writeTx.delete(car1Path);
160
161         followerTestKit.doCommit(writeTx.ready());
162
163         verifyExists(followerDistributedDataStore.newReadOnlyTransaction(), car2Path);
164
165         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
166
167         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car2);
168
169         // Re-instate the follower member 2 as a single-node to verify replication and recovery.
170
171         InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
172
173         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
174         JavaTestKit.shutdownActorSystem(followerSystem, null, true);
175
176         ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2"));
177
178         DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder).
179                 setupDistributedDataStore(testName, "module-shards-member2", true, SHARD_NAMES);
180
181         verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
182
183         JavaTestKit.shutdownActorSystem(newSystem);
184     }
185
186     @Test
187     public void testReadWriteTransactionWithSingleShard() throws Exception {
188         initDatastores("testReadWriteTransactionWithSingleShard");
189
190         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
191         assertNotNull("newReadWriteTransaction returned null", rwTx);
192
193         rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
194         rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
195
196         MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
197         rwTx.merge(CarsModel.newCarPath("optima"), car1);
198
199         verifyCars(rwTx, car1);
200
201         MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
202         YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
203         rwTx.merge(car2Path, car2);
204
205         verifyExists(rwTx, car2Path);
206
207         followerTestKit.doCommit(rwTx.ready());
208
209         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
210     }
211
212     @Test
213     public void testWriteTransactionWithMultipleShards() throws Exception {
214         initDatastores("testWriteTransactionWithMultipleShards");
215
216         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
217         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
218
219         YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
220         NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
221         writeTx.write(carsPath, carsNode);
222
223         YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
224         NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
225         writeTx.write(peoplePath, peopleNode);
226
227         followerTestKit.doCommit(writeTx.ready());
228
229         DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
230
231         verifyNode(readTx, carsPath, carsNode);
232         verifyNode(readTx, peoplePath, peopleNode);
233     }
234
235     @Test
236     public void testReadWriteTransactionWithMultipleShards() throws Exception {
237         initDatastores("testReadWriteTransactionWithMultipleShards");
238
239         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
240         assertNotNull("newReadWriteTransaction returned null", rwTx);
241
242         YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
243         NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
244         rwTx.write(carsPath, carsNode);
245
246         YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
247         NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
248         rwTx.write(peoplePath, peopleNode);
249
250         followerTestKit.doCommit(rwTx.ready());
251
252         DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
253
254         verifyNode(readTx, carsPath, carsNode);
255         verifyNode(readTx, peoplePath, peopleNode);
256     }
257
258     @Test
259     public void testTransactionChainWithSingleShard() throws Exception {
260         initDatastores("testTransactionChainWithSingleShard");
261
262         DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
263
264         // Add the top-level cars container with write-only.
265
266         DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
267         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
268
269         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
270
271         writeTx.ready();
272
273         // Verify the top-level cars container with read-only.
274
275         verifyNode(txChain.newReadOnlyTransaction(), CarsModel.BASE_PATH, CarsModel.emptyContainer());
276
277         // Perform car operations with read-write.
278
279         DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
280
281         verifyNode(rwTx, CarsModel.BASE_PATH, CarsModel.emptyContainer());
282
283         rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
284
285         MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
286         YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
287         rwTx.write(car1Path, car1);
288
289         verifyExists(rwTx, car1Path);
290
291         verifyCars(rwTx, car1);
292
293         MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
294         rwTx.merge(CarsModel.newCarPath("sportage"), car2);
295
296         rwTx.delete(car1Path);
297
298         followerTestKit.doCommit(rwTx.ready());
299
300         txChain.close();
301
302         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
303     }
304
305     @Test
306     public void testTransactionChainWithMultipleShards() throws Exception{
307         initDatastores("testTransactionChainWithMultipleShards");
308
309         DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
310
311         DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
312         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
313
314         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
315         writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
316
317         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
318         writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
319
320         followerTestKit.doCommit(writeTx.ready());
321
322         DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
323
324         MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
325         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
326         readWriteTx.write(carPath, car);
327
328         MapEntryNode person = PeopleModel.newPersonEntry("jack");
329         YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
330         readWriteTx.merge(personPath, person);
331
332         Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
333         assertEquals("isPresent", true, optional.isPresent());
334         assertEquals("Data node", car, optional.get());
335
336         optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
337         assertEquals("isPresent", true, optional.isPresent());
338         assertEquals("Data node", person, optional.get());
339
340         DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
341
342         writeTx = txChain.newWriteOnlyTransaction();
343
344         writeTx.delete(personPath);
345
346         DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
347
348         followerTestKit.doCommit(cohort2);
349         followerTestKit.doCommit(cohort3);
350
351         txChain.close();
352
353         DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
354         verifyCars(readTx, car);
355
356         optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
357         assertEquals("isPresent", false, optional.isPresent());
358     }
359
360     @Test
361     public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
362         String testName = "testSingleShardTransactionsWithLeaderChanges";
363         initDatastores(testName);
364
365         String followerCarShardName = "member-2-shard-cars-" + testName;
366         InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class );
367
368         // Write top-level car container from the follower so it uses a remote Tx.
369
370         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
371
372         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
373         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
374
375         followerTestKit.doCommit(writeTx.ready());
376
377         InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
378
379         // Switch the leader to the follower
380
381         followerDatastoreContextBuilder.shardElectionTimeoutFactor(1);
382         followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
383
384         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
385
386         followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES);
387
388         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
389         Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS);
390
391         DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder().
392                 shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
393         IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
394         DistributedDataStore newMember1Datastore = newMember1TestKit.
395                     setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
396
397         followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES);
398
399         // Write a car entry to the new leader - should switch to local Tx
400
401         writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
402
403         MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
404         YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
405         writeTx.merge(car1Path, car1);
406
407         followerTestKit.doCommit(writeTx.ready());
408
409         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1);
410     }
411
412     @Test
413     public void testReadyLocalTransactionForwardedToLeader() throws Exception {
414         initDatastores("testReadyLocalTransactionForwardedToLeader");
415
416         Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars");
417         assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
418
419         TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
420         dataTree.setSchemaContext(SchemaContextHelper.full());
421         DataTreeModification modification = dataTree.takeSnapshot().newModification();
422
423         new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
424         new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
425
426         MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
427         new WriteModification(CarsModel.newCarPath("optima"), car).apply(modification);
428
429         String transactionID = "tx-1";
430         ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(transactionID , modification, true);
431
432         carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
433         followerTestKit.expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
434
435         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car);
436     }
437 }