Bug 6587: Retain state when transitioning between Leader and IsolatedLeader
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.eq;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.verify;
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSelection;
20 import akka.actor.ActorSystem;
21 import akka.actor.Address;
22 import akka.actor.AddressFromURIString;
23 import akka.cluster.Cluster;
24 import akka.dispatch.Futures;
25 import akka.pattern.AskTimeoutException;
26 import akka.pattern.Patterns;
27 import akka.testkit.JavaTestKit;
28 import com.google.common.base.Optional;
29 import com.google.common.base.Supplier;
30 import com.google.common.collect.ImmutableMap;
31 import com.google.common.util.concurrent.ListenableFuture;
32 import com.google.common.util.concurrent.MoreExecutors;
33 import com.google.common.util.concurrent.Uninterruptibles;
34 import com.typesafe.config.ConfigFactory;
35 import java.math.BigInteger;
36 import java.util.Arrays;
37 import java.util.LinkedList;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.TimeUnit;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.mockito.Mockito;
44 import org.mockito.stubbing.Answer;
45 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
46 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
47 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
50 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
51 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
53 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
55 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
56 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
57 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
58 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
59 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
60 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
61 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
62 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
63 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
64 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
65 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
66 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
67 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
68 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
69 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
70 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
71 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
72 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
73 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
74 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
75 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
76 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
77 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
78 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
79 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
80 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
81 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
82 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
83 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
84 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
85 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
86 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
87 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
88 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
89 import scala.concurrent.Await;
90 import scala.concurrent.Future;
91 import scala.concurrent.duration.FiniteDuration;
92
93 /**
94  * End-to-end distributed data store tests that exercise remote shards and transactions.
95  *
96  * @author Thomas Pantelis
97  */
98 public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
99
100     private static final String[] CARS_AND_PEOPLE = {"cars", "people"};
101     private static final String[] CARS = {"cars"};
102
103     private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
104     private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2559");
105
106     private static final String MODULE_SHARDS_CARS_ONLY_1_2 = "module-shards-cars-member-1-and-2.conf";
107     private static final String MODULE_SHARDS_CARS_PEOPLE_1_2 = "module-shards-member1-and-2.conf";
108     private static final String MODULE_SHARDS_CARS_PEOPLE_1_2_3 = "module-shards-member1-and-2-and-3.conf";
109
110     private ActorSystem leaderSystem;
111     private ActorSystem followerSystem;
112     private ActorSystem follower2System;
113
114     private final DatastoreContext.Builder leaderDatastoreContextBuilder =
115             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
116
117     private final DatastoreContext.Builder followerDatastoreContextBuilder =
118             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5).
119                 customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
120     private final TransactionIdentifier tx1 = nextTransactionId();
121     private final TransactionIdentifier tx2 = nextTransactionId();
122
123     private DistributedDataStore followerDistributedDataStore;
124     private DistributedDataStore leaderDistributedDataStore;
125     private IntegrationTestKit followerTestKit;
126     private IntegrationTestKit leaderTestKit;
127
128     @Before
129     public void setUp() {
130         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
131         Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
132
133         followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
134         Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
135
136         follower2System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member3"));
137         Cluster.get(follower2System).join(MEMBER_1_ADDRESS);
138     }
139
140     @After
141     public void tearDown() {
142         if (followerDistributedDataStore != null) {
143             leaderDistributedDataStore.close();
144         }
145         if (leaderDistributedDataStore != null) {
146             leaderDistributedDataStore.close();
147         }
148
149         JavaTestKit.shutdownActorSystem(leaderSystem);
150         JavaTestKit.shutdownActorSystem(followerSystem);
151         JavaTestKit.shutdownActorSystem(follower2System);
152     }
153
154     private void initDatastoresWithCars(String type) {
155         initDatastores(type, MODULE_SHARDS_CARS_ONLY_1_2, CARS);
156     }
157
158     private void initDatastoresWithCarsAndPeople(String type) {
159         initDatastores(type, MODULE_SHARDS_CARS_PEOPLE_1_2, CARS_AND_PEOPLE);
160     }
161
162     private void initDatastores(String type, String moduleShardsConfig, String[] shards) {
163         leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
164
165         leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, shards);
166
167         followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
168         followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, shards);
169
170         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards);
171     }
172
173     private static void verifyCars(DOMStoreReadTransaction readTx, MapEntryNode... entries) throws Exception {
174         Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
175         assertEquals("isPresent", true, optional.isPresent());
176
177         CollectionNodeBuilder<MapEntryNode, MapNode> listBuilder = ImmutableNodes.mapNodeBuilder(CarsModel.CAR_QNAME);
178         for(NormalizedNode<?, ?> entry: entries) {
179             listBuilder.withChild((MapEntryNode) entry);
180         }
181
182         assertEquals("Car list node", listBuilder.build(), optional.get());
183     }
184
185     private static void verifyNode(DOMStoreReadTransaction readTx, YangInstanceIdentifier path, NormalizedNode<?, ?> expNode)
186             throws Exception {
187         Optional<NormalizedNode<?, ?>> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
188         assertEquals("isPresent", true, optional.isPresent());
189         assertEquals("Data node", expNode, optional.get());
190     }
191
192     private static void verifyExists(DOMStoreReadTransaction readTx, YangInstanceIdentifier path) throws Exception {
193         Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS);
194         assertEquals("exists", true, exists);
195     }
196
197     @Test
198     public void testWriteTransactionWithSingleShard() throws Exception {
199         String testName = "testWriteTransactionWithSingleShard";
200         initDatastoresWithCars(testName);
201
202         String followerCarShardName = "member-2-shard-cars-" + testName;
203         InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class );
204
205         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
206         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
207
208         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
209         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
210
211         MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
212         YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
213         writeTx.merge(car1Path, car1);
214
215         MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
216         YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
217         writeTx.merge(car2Path, car2);
218
219         followerTestKit.doCommit(writeTx.ready());
220
221         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
222
223         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
224
225         // Test delete
226
227         writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
228
229         writeTx.delete(car1Path);
230
231         followerTestKit.doCommit(writeTx.ready());
232
233         verifyExists(followerDistributedDataStore.newReadOnlyTransaction(), car2Path);
234
235         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
236
237         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car2);
238
239         // Re-instate the follower member 2 as a single-node to verify replication and recovery.
240
241         InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
242
243         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
244         JavaTestKit.shutdownActorSystem(followerSystem, null, true);
245
246         ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2"));
247
248         try (DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder).
249                 setupDistributedDataStore(testName, "module-shards-member2", true, CARS_AND_PEOPLE)) {
250             verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
251         }
252
253         JavaTestKit.shutdownActorSystem(newSystem);
254     }
255
256     @Test
257     public void testReadWriteTransactionWithSingleShard() throws Exception {
258         initDatastoresWithCars("testReadWriteTransactionWithSingleShard");
259
260         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
261         assertNotNull("newReadWriteTransaction returned null", rwTx);
262
263         rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
264         rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
265
266         MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
267         rwTx.merge(CarsModel.newCarPath("optima"), car1);
268
269         verifyCars(rwTx, car1);
270
271         MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
272         YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
273         rwTx.merge(car2Path, car2);
274
275         verifyExists(rwTx, car2Path);
276
277         followerTestKit.doCommit(rwTx.ready());
278
279         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
280     }
281
282     @Test
283     public void testWriteTransactionWithMultipleShards() throws Exception {
284         initDatastoresWithCarsAndPeople("testWriteTransactionWithMultipleShards");
285
286         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
287         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
288
289         YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
290         NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
291         writeTx.write(carsPath, carsNode);
292
293         YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
294         NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
295         writeTx.write(peoplePath, peopleNode);
296
297         followerTestKit.doCommit(writeTx.ready());
298
299         DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
300
301         verifyNode(readTx, carsPath, carsNode);
302         verifyNode(readTx, peoplePath, peopleNode);
303     }
304
305     @Test
306     public void testReadWriteTransactionWithMultipleShards() throws Exception {
307         initDatastoresWithCarsAndPeople("testReadWriteTransactionWithMultipleShards");
308
309         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
310         assertNotNull("newReadWriteTransaction returned null", rwTx);
311
312         YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
313         NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
314         rwTx.write(carsPath, carsNode);
315
316         YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
317         NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
318         rwTx.write(peoplePath, peopleNode);
319
320         followerTestKit.doCommit(rwTx.ready());
321
322         DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
323
324         verifyNode(readTx, carsPath, carsNode);
325         verifyNode(readTx, peoplePath, peopleNode);
326     }
327
328     @Test
329     public void testTransactionChainWithSingleShard() throws Exception {
330         initDatastoresWithCars("testTransactionChainWithSingleShard");
331
332         DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
333
334         // Add the top-level cars container with write-only.
335
336         DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
337         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
338
339         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
340
341         writeTx.ready();
342
343         // Verify the top-level cars container with read-only.
344
345         verifyNode(txChain.newReadOnlyTransaction(), CarsModel.BASE_PATH, CarsModel.emptyContainer());
346
347         // Perform car operations with read-write.
348
349         DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
350
351         verifyNode(rwTx, CarsModel.BASE_PATH, CarsModel.emptyContainer());
352
353         rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
354
355         MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
356         YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
357         rwTx.write(car1Path, car1);
358
359         verifyExists(rwTx, car1Path);
360
361         verifyCars(rwTx, car1);
362
363         MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
364         rwTx.merge(CarsModel.newCarPath("sportage"), car2);
365
366         rwTx.delete(car1Path);
367
368         followerTestKit.doCommit(rwTx.ready());
369
370         txChain.close();
371
372         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
373     }
374
375     @Test
376     public void testTransactionChainWithMultipleShards() throws Exception{
377         initDatastoresWithCarsAndPeople("testTransactionChainWithMultipleShards");
378
379         DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
380
381         DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
382         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
383
384         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
385         writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
386
387         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
388         writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
389
390         followerTestKit.doCommit(writeTx.ready());
391
392         DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
393
394         MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
395         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
396         readWriteTx.write(carPath, car);
397
398         MapEntryNode person = PeopleModel.newPersonEntry("jack");
399         YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
400         readWriteTx.merge(personPath, person);
401
402         Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
403         assertEquals("isPresent", true, optional.isPresent());
404         assertEquals("Data node", car, optional.get());
405
406         optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
407         assertEquals("isPresent", true, optional.isPresent());
408         assertEquals("Data node", person, optional.get());
409
410         DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
411
412         writeTx = txChain.newWriteOnlyTransaction();
413
414         writeTx.delete(personPath);
415
416         DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
417
418         followerTestKit.doCommit(cohort2);
419         followerTestKit.doCommit(cohort3);
420
421         txChain.close();
422
423         DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
424         verifyCars(readTx, car);
425
426         optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
427         assertEquals("isPresent", false, optional.isPresent());
428     }
429
430     @Test
431     public void testChainedTransactionFailureWithSingleShard() throws Exception {
432         initDatastoresWithCars("testChainedTransactionFailureWithSingleShard");
433
434         ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
435                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
436                         LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
437                         MoreExecutors.directExecutor());
438
439         TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
440         DOMTransactionChain txChain = broker.createTransactionChain(listener);
441
442         DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
443
444         ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
445                 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
446                     withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
447
448         writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
449
450         try {
451             writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
452             fail("Expected TransactionCommitFailedException");
453         } catch (TransactionCommitFailedException e) {
454             // Expected
455         }
456
457         verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
458
459         txChain.close();
460         broker.close();
461     }
462
463     @Test
464     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
465         initDatastoresWithCarsAndPeople("testChainedTransactionFailureWithMultipleShards");
466
467         ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
468                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
469                         LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
470                         MoreExecutors.directExecutor());
471
472         TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
473         DOMTransactionChain txChain = broker.createTransactionChain(listener);
474
475         DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
476
477         writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
478
479         ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
480                 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
481                     withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
482
483         // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
484         // done for put for performance reasons.
485         writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
486
487         try {
488             writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
489             fail("Expected TransactionCommitFailedException");
490         } catch (TransactionCommitFailedException e) {
491             // Expected
492         }
493
494         verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
495
496         txChain.close();
497         broker.close();
498     }
499
500     @Test
501     public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
502         String testName = "testSingleShardTransactionsWithLeaderChanges";
503         initDatastoresWithCars(testName);
504
505         String followerCarShardName = "member-2-shard-cars-" + testName;
506         InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class );
507
508         // Write top-level car container from the follower so it uses a remote Tx.
509
510         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
511
512         writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
513         writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
514
515         followerTestKit.doCommit(writeTx.ready());
516
517         InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
518
519         // Switch the leader to the follower
520
521         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
522                 shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
523
524         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
525         Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
526
527         followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS);
528
529         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
530         Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS);
531
532         DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder().
533                 shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
534         IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
535
536         try (DistributedDataStore ds =
537                 newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) {
538
539             followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS);
540
541             // Write a car entry to the new leader - should switch to local Tx
542
543             writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
544
545             MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
546             YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
547             writeTx.merge(car1Path, car1);
548
549             followerTestKit.doCommit(writeTx.ready());
550
551             verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1);
552         }
553     }
554
555     @SuppressWarnings("unchecked")
556     @Test
557     public void testReadyLocalTransactionForwardedToLeader() throws Exception {
558         initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
559         followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
560
561         Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars");
562         assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
563
564         TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
565         dataTree.setSchemaContext(SchemaContextHelper.full());
566
567         // Send a tx with immediate commit.
568
569         DataTreeModification modification = dataTree.takeSnapshot().newModification();
570         new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
571         new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
572
573         MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
574         new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
575         modification.ready();
576
577         ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true);
578
579         carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
580         Object resp = followerTestKit.expectMsgClass(Object.class);
581         if(resp instanceof akka.actor.Status.Failure) {
582             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
583         }
584
585         assertEquals("Response type", CommitTransactionReply.class, resp.getClass());
586
587         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
588
589         // Send another tx without immediate commit.
590
591         modification = dataTree.takeSnapshot().newModification();
592         MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
593         new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
594         modification.ready();
595
596         readyLocal = new ReadyLocalTransaction(tx2 , modification, false);
597
598         carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
599         resp = followerTestKit.expectMsgClass(Object.class);
600         if(resp instanceof akka.actor.Status.Failure) {
601             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
602         }
603
604         assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
605
606         ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
607                 ((ReadyTransactionReply)resp).getCohortPath());
608
609         Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
610         Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
611         ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
612                 leaderDistributedDataStore.getActorContext(), Arrays.asList(
613                         new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
614         cohort.canCommit().get(5, TimeUnit.SECONDS);
615         cohort.preCommit().get(5, TimeUnit.SECONDS);
616         cohort.commit().get(5, TimeUnit.SECONDS);
617
618         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
619     }
620
621     @SuppressWarnings("unchecked")
622     @Test
623     public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
624         initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
625         followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
626
627         Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars");
628         assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
629
630         carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
631         DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class);
632
633         // Send a tx with immediate commit.
634
635         DataTreeModification modification = dataTree.takeSnapshot().newModification();
636         new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
637         new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
638
639         MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
640         new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
641
642         ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1,
643                 DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
644                         Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true);
645
646         carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
647         Object resp = followerTestKit.expectMsgClass(Object.class);
648         if(resp instanceof akka.actor.Status.Failure) {
649             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
650         }
651
652         assertEquals("Response type", CommitTransactionReply.class, resp.getClass());
653
654         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
655
656         // Send another tx without immediate commit.
657
658         modification = dataTree.takeSnapshot().newModification();
659         MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
660         new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
661
662         forwardedReady = new ForwardedReadyTransaction(tx2,
663                 DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
664                         Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false);
665
666         carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
667         resp = followerTestKit.expectMsgClass(Object.class);
668         if(resp instanceof akka.actor.Status.Failure) {
669             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
670         }
671
672         assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
673
674         ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
675                 ((ReadyTransactionReply)resp).getCohortPath());
676
677         Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
678         Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
679         ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
680                 leaderDistributedDataStore.getActorContext(), Arrays.asList(
681                         new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
682         cohort.canCommit().get(5, TimeUnit.SECONDS);
683         cohort.preCommit().get(5, TimeUnit.SECONDS);
684         cohort.commit().get(5, TimeUnit.SECONDS);
685
686         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
687     }
688
689     @Test
690     public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
691         followerDatastoreContextBuilder.shardBatchedModificationCount(2);
692         leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
693         initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
694
695         // Do an initial write to get the primary shard info cached.
696
697         DOMStoreWriteTransaction initialWriteTx = followerDistributedDataStore.newWriteOnlyTransaction();
698         initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
699         initialWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
700         followerTestKit.doCommit(initialWriteTx.ready());
701
702         // Wait for the commit to be replicated to the follower.
703
704         MemberNode.verifyRaftState(followerDistributedDataStore, "cars",
705                 raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied()));
706
707         MemberNode.verifyRaftState(followerDistributedDataStore, "people",
708                 raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied()));
709
710         // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
711         // the leader shard.
712
713         DOMStoreWriteTransaction writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction();
714         writeTx1.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
715         writeTx1.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
716         DOMStoreThreePhaseCommitCohort writeTx1Cohort = writeTx1.ready();
717         ListenableFuture<Boolean> writeTx1CanCommit = writeTx1Cohort.canCommit();
718         writeTx1CanCommit.get(5, TimeUnit.SECONDS);
719
720         // Prepare and ready another WO tx that writes to 2 shards but don't canCommit yet. This will be queued
721         // in the leader shard.
722
723         DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction();
724         LinkedList<MapEntryNode> cars = new LinkedList<>();
725         int carIndex = 1;
726         cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
727         writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
728         carIndex++;
729         NormalizedNode<?, ?> people = PeopleModel.newPersonMapNode();
730         writeTx2.write(PeopleModel.PERSON_LIST_PATH, people);
731         DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready();
732
733         // Prepare another WO that writes to a single shard and thus will be directly committed on ready. This
734         // tx writes 5 cars so 2 BatchedModidifications messages will be sent initially and cached in the
735         // leader shard (with shardBatchedModificationCount set to 2). The 3rd BatchedModidifications will be
736         // sent on ready.
737
738         DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction();
739         for(int i = 1; i <= 5; i++, carIndex++) {
740             cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
741             writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
742         }
743
744         // Prepare another WO that writes to a single shard. This will send a single BatchedModidifications
745         // message on ready.
746
747         DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction();
748         cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
749         writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
750         carIndex++;
751
752         // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaciton message to the
753         // leader shard on ready.
754
755         DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
756         cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
757         readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
758
759         IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
760                 stats -> assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount()));
761
762         // Disable elections on the leader so it switches to follower.
763
764         sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
765                 customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
766                 shardElectionTimeoutFactor(10));
767
768         Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
769         leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars");
770
771         // Submit all tx's - the messages should get queued for retry.
772
773         ListenableFuture<Boolean> writeTx2CanCommit = writeTx2Cohort.canCommit();
774         DOMStoreThreePhaseCommitCohort writeTx3Cohort = writeTx3.ready();
775         DOMStoreThreePhaseCommitCohort writeTx4Cohort = writeTx4.ready();
776         DOMStoreThreePhaseCommitCohort rwTxCohort = readWriteTx.ready();
777
778         // Enable elections on the other follower so it becomes the leader, at which point the
779         // tx's should get forwarded from the previous leader to the new leader to complete the commits.
780
781         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
782                 customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1));
783
784         followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort);
785         followerTestKit.doCommit(writeTx2CanCommit, writeTx2Cohort);
786         followerTestKit.doCommit(writeTx3Cohort);
787         followerTestKit.doCommit(writeTx4Cohort);
788         followerTestKit.doCommit(rwTxCohort);
789
790         DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction();
791         verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()]));
792         verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people);
793     }
794
795     @Test
796     public void testLeadershipTransferOnShutdown() throws Exception {
797         leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
798         followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
799         String testName = "testLeadershipTransferOnShutdown";
800         initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
801
802         IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder);
803         try (DistributedDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName,
804                 MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
805
806             // Create and submit a couple tx's so they're pending.
807
808             DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
809             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
810             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
811             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
812             DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
813
814             IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
815                     stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()));
816
817             writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
818             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
819             writeTx.write(CarsModel.newCarPath("optima"), car);
820             DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
821
822             IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
823                     stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()));
824
825             // Gracefully stop the leader via a Shutdown message.
826
827             sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
828                 shardElectionTimeoutFactor(100));
829
830             FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
831             Future<ActorRef> future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars");
832             ActorRef leaderActor = Await.result(future, duration);
833
834             Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
835
836             // Commit the 2 transactions. They should finish and succeed.
837
838             followerTestKit.doCommit(cohort1);
839             followerTestKit.doCommit(cohort2);
840
841             // Wait for the leader actor stopped.
842
843             Boolean stopped = Await.result(stopFuture, duration);
844             assertEquals("Stopped", Boolean.TRUE, stopped);
845
846             // Verify leadership was transferred by reading the committed data from the other nodes.
847
848             verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car);
849             verifyCars(follower2DistributedDataStore.newReadOnlyTransaction(), car);
850         }
851     }
852
853     @Test
854     public void testTransactionWithIsolatedLeader() throws Throwable {
855         // Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
856         leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
857         String testName = "testTransactionWithIsolatedLeader";
858         initDatastoresWithCars(testName);
859
860         // Tx that is submitted after the follower is stopped but before the leader transitions to IsolatedLeader.
861         DOMStoreWriteTransaction preIsolatedLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
862         preIsolatedLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
863
864         // Tx that is submitted after the leader transitions to IsolatedLeader.
865         DOMStoreWriteTransaction noShardLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
866         noShardLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
867
868         // Tx that is submitted after the follower is reinstated.
869         DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
870         successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
871
872         // Stop the follower
873         followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager());
874         followerDistributedDataStore.close();
875         followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager());
876
877         // Submit the preIsolatedLeaderWriteTx so it's pending
878         DOMStoreThreePhaseCommitCohort preIsolatedLeaderTxCohort = preIsolatedLeaderWriteTx.ready();
879
880         // Change the isolated leader check interval low so it changes to IsolatedLeader.
881         sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
882                 shardIsolatedLeaderCheckIntervalInMillis(200));
883
884         MemberNode.verifyRaftState(leaderDistributedDataStore, "cars",
885                 raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()));
886
887         try {
888             leaderTestKit.doCommit(noShardLeaderWriteTx.ready());
889             fail("Expected NoShardLeaderException");
890         } catch (ExecutionException e) {
891             assertEquals("getCause", NoShardLeaderException.class, e.getCause().getClass());
892         }
893
894         sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
895                 shardElectionTimeoutFactor(100));
896
897         DOMStoreThreePhaseCommitCohort successTxCohort = successWriteTx.ready();
898
899         followerDistributedDataStore = followerTestKit.setupDistributedDataStore(testName,
900                 MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
901
902         leaderTestKit.doCommit(preIsolatedLeaderTxCohort);
903         leaderTestKit.doCommit(successTxCohort);
904     }
905
906     @Test(expected=AskTimeoutException.class)
907     public void testTransactionWithShardLeaderNotResponding() throws Throwable {
908         initDatastoresWithCars("testTransactionWithShardLeaderNotResponding");
909
910         // Do an initial read to get the primary shard info cached.
911
912         DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
913         readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
914
915         // Shutdown the leader and try to create a new tx.
916
917         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
918
919         followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1);
920         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
921
922         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
923
924         rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
925
926         try {
927             followerTestKit.doCommit(rwTx.ready());
928         } catch (ExecutionException e) {
929             assertTrue("Expected ShardLeaderNotRespondingException cause. Actual: " + e.getCause(),
930                     e.getCause() instanceof ShardLeaderNotRespondingException);
931             assertNotNull("Expected a nested cause", e.getCause().getCause());
932             throw e.getCause().getCause();
933         }
934     }
935
936     @Test(expected=NoShardLeaderException.class)
937     public void testTransactionWithCreateTxFailureDueToNoLeader() throws Throwable {
938         initDatastoresWithCars("testTransactionWithCreateTxFailureDueToNoLeader");
939
940         // Do an initial read to get the primary shard info cached.
941
942         DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
943         readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
944
945         // Shutdown the leader and try to create a new tx.
946
947         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
948
949         Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
950
951         Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
952
953         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
954                 operationTimeoutInMillis(10).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
955
956         DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
957
958         rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
959
960         try {
961             followerTestKit.doCommit(rwTx.ready());
962         } catch (ExecutionException e) {
963             throw e.getCause();
964         }
965     }
966
967     @Test
968     public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception {
969         String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx";
970         initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS);
971
972         DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder().
973                 shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
974         IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder);
975
976         try (DistributedDataStore ds =
977                 follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS)) {
978
979             followerTestKit.waitForMembersUp("member-1", "member-3");
980             follower2TestKit.waitForMembersUp("member-1", "member-2");
981
982             // Do an initial read to get the primary shard info cached.
983
984             DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
985             readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
986
987             // Shutdown the leader and try to create a new tx.
988
989             JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
990
991             Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
992
993             sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
994                 operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
995
996             DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
997
998             rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
999
1000             followerTestKit.doCommit(rwTx.ready());
1001         }
1002     }
1003
1004     private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) {
1005         final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
1006         DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
1007         Answer<DatastoreContext> answer = invocation -> newBuilder.build();
1008         Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
1009         Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
1010         dataStore.onDatastoreContextUpdated(mockContextFactory);
1011     }
1012 }