Transaction retry when leader is isolated
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.eq;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.verify;
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSelection;
20 import akka.actor.ActorSystem;
21 import akka.actor.Address;
22 import akka.actor.AddressFromURIString;
23 import akka.cluster.Cluster;
24 import akka.dispatch.Futures;
25 import akka.pattern.AskTimeoutException;
26 import akka.testkit.JavaTestKit;
27 import com.google.common.base.Optional;
28 import com.google.common.collect.ImmutableMap;
29 import com.google.common.util.concurrent.MoreExecutors;
30 import com.google.common.util.concurrent.Uninterruptibles;
31 import com.typesafe.config.ConfigFactory;
32 import java.math.BigInteger;
33 import java.util.Arrays;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.TimeUnit;
36 import org.junit.After;
37 import org.junit.Before;
38 import org.junit.Test;
39 import org.mockito.Mockito;
40 import org.mockito.invocation.InvocationOnMock;
41 import org.mockito.stubbing.Answer;
42 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
43 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit.ShardStatsVerifier;
44 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
45 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
46 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
47 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
48 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
49 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
51 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
54 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
55 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
56 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
57 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
58 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
59 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
60 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
61 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
62 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
63 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
64 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
65 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
66 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
67 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
68 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
69 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
70 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
71 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
72 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
73 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
74 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
75 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
76 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
77 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
78 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
79 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
80 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
81 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
82 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
83 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
84 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
85 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
86 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
87
88 /**
89  * End-to-end distributed data store tests that exercise remote shards and transactions.
90  *
91  * @author Thomas Pantelis
92  */
93 public class DistributedDataStoreRemotingIntegrationTest {
94
95     private static final String[] CARS_AND_PEOPLE = {"cars", "people"};
96     private static final String[] CARS = {"cars"};
97
98     private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
99     private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2559");
100
101     private static final String MODULE_SHARDS_CARS_ONLY_1_2 = "module-shards-cars-member-1-and-2.conf";
102     private static final String MODULE_SHARDS_CARS_PEOPLE_1_2 = "module-shards-member1-and-2.conf";
103     private static final String MODULE_SHARDS_CARS_PEOPLE_1_2_3 = "module-shards-member1-and-2-and-3.conf";
104
105     private ActorSystem leaderSystem;
106     private ActorSystem followerSystem;
107     private ActorSystem follower2System;
108
109     private final DatastoreContext.Builder leaderDatastoreContextBuilder =
110             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
111
112     private final DatastoreContext.Builder followerDatastoreContextBuilder =
113             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5).
114                 customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
115
116     private DistributedDataStore followerDistributedDataStore;
117     private DistributedDataStore leaderDistributedDataStore;
118     private IntegrationTestKit followerTestKit;
119     private IntegrationTestKit leaderTestKit;
120
121     @Before
122     public void setUp() {
123         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
124         Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
125
126         followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
127         Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
128
129         follower2System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member3"));
130         Cluster.get(follower2System).join(MEMBER_1_ADDRESS);
131     }
132
133     @After
134     public void tearDown() {
135         JavaTestKit.shutdownActorSystem(leaderSystem);
136         JavaTestKit.shutdownActorSystem(followerSystem);
137         JavaTestKit.shutdownActorSystem(follower2System);
138     }
139
140     private void initDatastoresWithCars(String type) {
141         initDatastores(type, MODULE_SHARDS_CARS_ONLY_1_2, CARS);
142     }
143
144     private void initDatastoresWithCarsAndPeople(String type) {
145         initDatastores(type, MODULE_SHARDS_CARS_PEOPLE_1_2, CARS_AND_PEOPLE);
146     }
147
148     private void initDatastores(String type, String moduleShardsConfig, String[] shards) {
149         leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
150
151         leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, shards);
152
153         followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
154         followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, shards);
155
156         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards);
157     }
158
159     private static void verifyCars(DOMStoreReadTransaction readTx, MapEntryNode... entries) throws Exception {
160         Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
161         assertEquals("isPresent", true, optional.isPresent());
162
163         CollectionNodeBuilder<MapEntryNode, MapNode> listBuilder = ImmutableNodes.mapNodeBuilder(CarsModel.CAR_QNAME);
164         for(NormalizedNode<?, ?> entry: entries) {
165             listBuilder.withChild((MapEntryNode) entry);
166         }
167
168         assertEquals("Car list node", listBuilder.build(), optional.get());
169     }
170
171     private static void verifyNode(DOMStoreReadTransaction readTx, YangInstanceIdentifier path, NormalizedNode<?, ?> expNode)
172             throws Exception {
173         Optional<NormalizedNode<?, ?>> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
174         assertEquals("isPresent", true, optional.isPresent());
175         assertEquals("Data node", expNode, optional.get());
176     }
177
178     private static void verifyExists(DOMStoreReadTransaction readTx, YangInstanceIdentifier path) throws Exception {
179         Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS);
180         assertEquals("exists", true, exists);
181     }
182
183     @Test
184     public void testWriteTransactionWithSingleShard() throws Exception {
185         String testName = "testWriteTransactionWithSingleShard";
186         initDatastoresWithCars(testName);
187
188         String followerCarShardName = "member-2-shard-cars-" + testName;
189         InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class );
190
191         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
192         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
193
194         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
195         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
196
197         MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
198         YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
199         writeTx.merge(car1Path, car1);
200
201         MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
202         YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
203         writeTx.merge(car2Path, car2);
204
205         followerTestKit.doCommit(writeTx.ready());
206
207         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
208
209         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
210
211         // Test delete
212
213         writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
214
215         writeTx.delete(car1Path);
216
217         followerTestKit.doCommit(writeTx.ready());
218
219         verifyExists(followerDistributedDataStore.newReadOnlyTransaction(), car2Path);
220
221         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
222
223         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car2);
224
225         // Re-instate the follower member 2 as a single-node to verify replication and recovery.
226
227         InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
228
229         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
230         JavaTestKit.shutdownActorSystem(followerSystem, null, true);
231
232         ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2"));
233
234         DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder).
235                 setupDistributedDataStore(testName, "module-shards-member2", true, CARS_AND_PEOPLE);
236
237         verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
238
239         JavaTestKit.shutdownActorSystem(newSystem);
240     }
241
242     @Test
243     public void testReadWriteTransactionWithSingleShard() throws Exception {
244         initDatastoresWithCars("testReadWriteTransactionWithSingleShard");
245
246         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
247         assertNotNull("newReadWriteTransaction returned null", rwTx);
248
249         rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
250         rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
251
252         MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
253         rwTx.merge(CarsModel.newCarPath("optima"), car1);
254
255         verifyCars(rwTx, car1);
256
257         MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
258         YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
259         rwTx.merge(car2Path, car2);
260
261         verifyExists(rwTx, car2Path);
262
263         followerTestKit.doCommit(rwTx.ready());
264
265         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
266     }
267
268     @Test
269     public void testWriteTransactionWithMultipleShards() throws Exception {
270         initDatastoresWithCarsAndPeople("testWriteTransactionWithMultipleShards");
271
272         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
273         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
274
275         YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
276         NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
277         writeTx.write(carsPath, carsNode);
278
279         YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
280         NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
281         writeTx.write(peoplePath, peopleNode);
282
283         followerTestKit.doCommit(writeTx.ready());
284
285         DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
286
287         verifyNode(readTx, carsPath, carsNode);
288         verifyNode(readTx, peoplePath, peopleNode);
289     }
290
291     @Test
292     public void testReadWriteTransactionWithMultipleShards() throws Exception {
293         initDatastoresWithCarsAndPeople("testReadWriteTransactionWithMultipleShards");
294
295         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
296         assertNotNull("newReadWriteTransaction returned null", rwTx);
297
298         YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
299         NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
300         rwTx.write(carsPath, carsNode);
301
302         YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
303         NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
304         rwTx.write(peoplePath, peopleNode);
305
306         followerTestKit.doCommit(rwTx.ready());
307
308         DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
309
310         verifyNode(readTx, carsPath, carsNode);
311         verifyNode(readTx, peoplePath, peopleNode);
312     }
313
314     @Test
315     public void testTransactionChainWithSingleShard() throws Exception {
316         initDatastoresWithCars("testTransactionChainWithSingleShard");
317
318         DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
319
320         // Add the top-level cars container with write-only.
321
322         DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
323         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
324
325         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
326
327         writeTx.ready();
328
329         // Verify the top-level cars container with read-only.
330
331         verifyNode(txChain.newReadOnlyTransaction(), CarsModel.BASE_PATH, CarsModel.emptyContainer());
332
333         // Perform car operations with read-write.
334
335         DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
336
337         verifyNode(rwTx, CarsModel.BASE_PATH, CarsModel.emptyContainer());
338
339         rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
340
341         MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
342         YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
343         rwTx.write(car1Path, car1);
344
345         verifyExists(rwTx, car1Path);
346
347         verifyCars(rwTx, car1);
348
349         MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
350         rwTx.merge(CarsModel.newCarPath("sportage"), car2);
351
352         rwTx.delete(car1Path);
353
354         followerTestKit.doCommit(rwTx.ready());
355
356         txChain.close();
357
358         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
359     }
360
361     @Test
362     public void testTransactionChainWithMultipleShards() throws Exception{
363         initDatastoresWithCarsAndPeople("testTransactionChainWithMultipleShards");
364
365         DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
366
367         DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
368         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
369
370         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
371         writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
372
373         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
374         writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
375
376         followerTestKit.doCommit(writeTx.ready());
377
378         DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
379
380         MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
381         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
382         readWriteTx.write(carPath, car);
383
384         MapEntryNode person = PeopleModel.newPersonEntry("jack");
385         YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
386         readWriteTx.merge(personPath, person);
387
388         Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
389         assertEquals("isPresent", true, optional.isPresent());
390         assertEquals("Data node", car, optional.get());
391
392         optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
393         assertEquals("isPresent", true, optional.isPresent());
394         assertEquals("Data node", person, optional.get());
395
396         DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
397
398         writeTx = txChain.newWriteOnlyTransaction();
399
400         writeTx.delete(personPath);
401
402         DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
403
404         followerTestKit.doCommit(cohort2);
405         followerTestKit.doCommit(cohort3);
406
407         txChain.close();
408
409         DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
410         verifyCars(readTx, car);
411
412         optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
413         assertEquals("isPresent", false, optional.isPresent());
414     }
415
416     @Test
417     public void testChainedTransactionFailureWithSingleShard() throws Exception {
418         initDatastoresWithCars("testChainedTransactionFailureWithSingleShard");
419
420         ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
421                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
422                         LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
423                         MoreExecutors.directExecutor());
424
425         TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
426         DOMTransactionChain txChain = broker.createTransactionChain(listener);
427
428         DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
429
430         ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
431                 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
432                     withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
433
434         writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
435
436         try {
437             writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
438             fail("Expected TransactionCommitFailedException");
439         } catch (TransactionCommitFailedException e) {
440             // Expected
441         }
442
443         verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
444
445         txChain.close();
446         broker.close();
447     }
448
449     @Test
450     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
451         initDatastoresWithCarsAndPeople("testChainedTransactionFailureWithMultipleShards");
452
453         ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
454                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
455                         LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
456                         MoreExecutors.directExecutor());
457
458         TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
459         DOMTransactionChain txChain = broker.createTransactionChain(listener);
460
461         DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
462
463         writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
464
465         ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
466                 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
467                     withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
468
469         // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
470         // done for put for performance reasons.
471         writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
472
473         try {
474             writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
475             fail("Expected TransactionCommitFailedException");
476         } catch (TransactionCommitFailedException e) {
477             // Expected
478         }
479
480         verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
481
482         txChain.close();
483         broker.close();
484     }
485
486     @Test
487     public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
488         String testName = "testSingleShardTransactionsWithLeaderChanges";
489         initDatastoresWithCars(testName);
490
491         String followerCarShardName = "member-2-shard-cars-" + testName;
492         InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class );
493
494         // Write top-level car container from the follower so it uses a remote Tx.
495
496         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
497
498         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
499         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
500
501         followerTestKit.doCommit(writeTx.ready());
502
503         InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
504
505         // Switch the leader to the follower
506
507         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
508                 shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
509
510         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
511
512         followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS);
513
514         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
515         Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS);
516
517         DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder().
518                 shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
519         IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
520         newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
521
522         followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS);
523
524         // Write a car entry to the new leader - should switch to local Tx
525
526         writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
527
528         MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
529         YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
530         writeTx.merge(car1Path, car1);
531
532         followerTestKit.doCommit(writeTx.ready());
533
534         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1);
535     }
536
537     @Test
538     public void testReadyLocalTransactionForwardedToLeader() throws Exception {
539         initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
540         followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
541
542         Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars");
543         assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
544
545         TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
546         dataTree.setSchemaContext(SchemaContextHelper.full());
547
548         // Send a tx with immediate commit.
549
550         DataTreeModification modification = dataTree.takeSnapshot().newModification();
551         new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
552         new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
553
554         MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
555         new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
556         modification.ready();
557
558         ReadyLocalTransaction readyLocal = new ReadyLocalTransaction("tx-1" , modification, true);
559
560         carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
561         Object resp = followerTestKit.expectMsgClass(Object.class);
562         if(resp instanceof akka.actor.Status.Failure) {
563             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
564         }
565
566         assertEquals("Response type", CommitTransactionReply.SERIALIZABLE_CLASS, resp.getClass());
567
568         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
569
570         // Send another tx without immediate commit.
571
572         modification = dataTree.takeSnapshot().newModification();
573         MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
574         new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
575         modification.ready();
576
577         readyLocal = new ReadyLocalTransaction("tx-2" , modification, false);
578
579         carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
580         resp = followerTestKit.expectMsgClass(Object.class);
581         if(resp instanceof akka.actor.Status.Failure) {
582             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
583         }
584
585         assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
586
587         ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
588                 ((ReadyTransactionReply)resp).getCohortPath());
589
590         ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
591                 leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2");
592         cohort.canCommit().get(5, TimeUnit.SECONDS);
593         cohort.preCommit().get(5, TimeUnit.SECONDS);
594         cohort.commit().get(5, TimeUnit.SECONDS);
595
596         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
597     }
598
599     @Test
600     public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
601         initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
602         followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
603
604         Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars");
605         assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
606
607         carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
608         DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class);
609
610         // Send a tx with immediate commit.
611
612         DataTreeModification modification = dataTree.takeSnapshot().newModification();
613         new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
614         new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
615
616         MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
617         new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
618
619         ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction("tx-1",
620                 DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
621                         Mockito.mock(ShardDataTreeTransactionParent.class), "tx-1", modification), true, true);
622
623         carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
624         Object resp = followerTestKit.expectMsgClass(Object.class);
625         if(resp instanceof akka.actor.Status.Failure) {
626             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
627         }
628
629         assertEquals("Response type", CommitTransactionReply.SERIALIZABLE_CLASS, resp.getClass());
630
631         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
632
633         // Send another tx without immediate commit.
634
635         modification = dataTree.takeSnapshot().newModification();
636         MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
637         new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
638
639         forwardedReady = new ForwardedReadyTransaction("tx-2",
640                 DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
641                         Mockito.mock(ShardDataTreeTransactionParent.class), "tx-2", modification), true, false);
642
643         carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
644         resp = followerTestKit.expectMsgClass(Object.class);
645         if(resp instanceof akka.actor.Status.Failure) {
646             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
647         }
648
649         assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
650
651         ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
652                 ((ReadyTransactionReply)resp).getCohortPath());
653
654         ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
655                 leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2");
656         cohort.canCommit().get(5, TimeUnit.SECONDS);
657         cohort.preCommit().get(5, TimeUnit.SECONDS);
658         cohort.commit().get(5, TimeUnit.SECONDS);
659
660         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
661     }
662
663     @Test
664     public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
665         initDatastoresWithCars("testTransactionForwardedToLeaderAfterRetry");
666
667         // Do an initial write to get the primary shard info cached.
668
669         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
670         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
671         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
672         followerTestKit.doCommit(writeTx.ready());
673
674         // Wait for the commit to be replicated to the follower.
675
676         MemberNode.verifyRaftState(followerDistributedDataStore, "cars", new RaftStateVerifier() {
677             @Override
678             public void verify(OnDemandRaftState raftState) {
679                 assertEquals("getLastApplied", 0, raftState.getLastApplied());
680             }
681         });
682
683         // Create and prepare wo and rw tx's.
684
685         writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
686         MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
687         writeTx.write(CarsModel.newCarPath("optima"), car1);
688
689         DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
690         MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
691         readWriteTx.write(CarsModel.newCarPath("sportage"), car2);
692
693         IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
694             @Override
695             public void verify(ShardStats stats) {
696                 assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount());
697             }
698         });
699
700         // Disable elections on the leader so it switches to follower.
701
702         sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
703                 customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
704                 shardElectionTimeoutFactor(10));
705
706         leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars");
707
708         // Submit tx's and enable elections on the follower so it becomes the leader, at which point the
709         // readied tx's should get forwarded from the previous leader.
710
711         DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
712         DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
713
714         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
715                 customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1));
716
717         followerTestKit.doCommit(cohort1);
718         followerTestKit.doCommit(cohort2);
719
720         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
721     }
722
723     @Test
724     public void testTransactionWithIsolatedLeader() throws Throwable {
725         leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(200);
726         String testName = "testTransactionWithIsolatedLeader";
727         initDatastoresWithCars(testName);
728
729         DOMStoreWriteTransaction failWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
730         failWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
731
732         DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
733         successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
734
735         followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager());
736         followerDistributedDataStore.close();
737         followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager());
738
739         MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", new RaftStateVerifier() {
740             @Override
741             public void verify(OnDemandRaftState raftState) {
742                 assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState());
743             }
744         });
745
746         try {
747             leaderTestKit.doCommit(failWriteTx.ready());
748             fail("Expected NoShardLeaderException");
749         } catch (ExecutionException e) {
750             assertEquals("getCause", NoShardLeaderException.class, e.getCause().getClass());
751         }
752
753         sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
754                 shardElectionTimeoutFactor(100));
755
756         DOMStoreThreePhaseCommitCohort writeTxCohort = successWriteTx.ready();
757
758         followerDistributedDataStore = followerTestKit.setupDistributedDataStore(testName,
759                 MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
760
761         leaderTestKit.doCommit(writeTxCohort);
762     }
763
764     @Test(expected=AskTimeoutException.class)
765     public void testTransactionWithShardLeaderNotResponding() throws Throwable {
766         initDatastoresWithCars("testTransactionWithShardLeaderNotResponding");
767
768         // Do an initial read to get the primary shard info cached.
769
770         DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
771         readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
772
773         // Shutdown the leader and try to create a new tx.
774
775         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
776
777         followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1);
778         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
779
780         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
781
782         rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
783
784         try {
785             followerTestKit.doCommit(rwTx.ready());
786         } catch (ExecutionException e) {
787             assertTrue("Expected ShardLeaderNotRespondingException cause. Actual: " + e.getCause(),
788                     e.getCause() instanceof ShardLeaderNotRespondingException);
789             assertNotNull("Expected a nested cause", e.getCause().getCause());
790             throw e.getCause().getCause();
791         }
792     }
793
794     @Test(expected=NoShardLeaderException.class)
795     public void testTransactionWithCreateTxFailureDueToNoLeader() throws Throwable {
796         initDatastoresWithCars("testTransactionWithCreateTxFailureDueToNoLeader");
797
798         // Do an initial read to get the primary shard info cached.
799
800         DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
801         readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
802
803         // Shutdown the leader and try to create a new tx.
804
805         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
806
807         Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
808
809         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
810                 operationTimeoutInMillis(10).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
811
812         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
813
814         rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
815
816         try {
817             followerTestKit.doCommit(rwTx.ready());
818         } catch (ExecutionException e) {
819             throw e.getCause();
820         }
821     }
822
823     @Test
824     public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception {
825         String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx";
826         initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS);
827
828         DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder().
829                 shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
830         IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder);
831         follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS);
832
833         // Do an initial read to get the primary shard info cached.
834
835         DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
836         readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
837
838         // Shutdown the leader and try to create a new tx.
839
840         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
841
842         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
843                 operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
844
845         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
846
847         rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
848
849         followerTestKit.doCommit(rwTx.ready());
850     }
851
852     private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) {
853         final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
854         DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
855         Answer<DatastoreContext> answer = new Answer<DatastoreContext>() {
856             @Override
857             public DatastoreContext answer(InvocationOnMock invocation) {
858                 return newBuilder.build();
859             }
860         };
861         Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
862         Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
863         dataStore.onDatastoreContextUpdated(mockContextFactory);
864     }
865 }