2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.eq;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.verify;
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSystem;
20 import akka.actor.Address;
21 import akka.actor.AddressFromURIString;
22 import akka.cluster.Cluster;
23 import akka.pattern.AskTimeoutException;
24 import akka.testkit.JavaTestKit;
25 import com.google.common.base.Optional;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.util.concurrent.MoreExecutors;
28 import com.google.common.util.concurrent.Uninterruptibles;
29 import com.typesafe.config.ConfigFactory;
30 import java.math.BigInteger;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.mockito.Mockito;
37 import org.mockito.invocation.InvocationOnMock;
38 import org.mockito.stubbing.Answer;
39 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
40 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
42 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
44 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
45 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
46 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
47 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
48 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
49 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
50 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
51 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
52 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
53 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
54 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
55 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
56 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
57 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
58 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
59 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
60 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
61 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
62 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
63 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
64 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
65 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
66 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
67 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
68 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
69 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
70 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
71 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
72 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
73 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
74 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
77 * End-to-end distributed data store tests that exercise remote shards and transactions.
79 * @author Thomas Pantelis
81 public class DistributedDataStoreRemotingIntegrationTest {
83 private static final String[] SHARD_NAMES = {"cars", "people"};
85 private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
86 private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2559");
88 private static final String MODULE_SHARDS_CONFIG_2 = "module-shards-member1-and-2.conf";
89 private static final String MODULE_SHARDS_CONFIG_3 = "module-shards-member1-and-2-and-3.conf";
91 private ActorSystem leaderSystem;
92 private ActorSystem followerSystem;
93 private ActorSystem follower2System;
95 private final DatastoreContext.Builder leaderDatastoreContextBuilder =
96 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1);
98 private final DatastoreContext.Builder followerDatastoreContextBuilder =
99 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
101 private DistributedDataStore followerDistributedDataStore;
102 private DistributedDataStore leaderDistributedDataStore;
103 private IntegrationTestKit followerTestKit;
104 private IntegrationTestKit leaderTestKit;
107 public void setUp() {
108 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
109 Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
111 followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
112 Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
114 follower2System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member3"));
115 Cluster.get(follower2System).join(MEMBER_1_ADDRESS);
119 public void tearDown() {
120 JavaTestKit.shutdownActorSystem(leaderSystem);
121 JavaTestKit.shutdownActorSystem(followerSystem);
122 JavaTestKit.shutdownActorSystem(follower2System);
125 private void initDatastores(String type) {
126 initDatastores(type, MODULE_SHARDS_CONFIG_2);
129 private void initDatastores(String type, String moduleShardsConfig) {
130 leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
132 leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES);
134 followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
135 followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES);
137 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), SHARD_NAMES);
140 private static void verifyCars(DOMStoreReadTransaction readTx, MapEntryNode... entries) throws Exception {
141 Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
142 assertEquals("isPresent", true, optional.isPresent());
144 CollectionNodeBuilder<MapEntryNode, MapNode> listBuilder = ImmutableNodes.mapNodeBuilder(CarsModel.CAR_QNAME);
145 for(NormalizedNode<?, ?> entry: entries) {
146 listBuilder.withChild((MapEntryNode) entry);
149 assertEquals("Car list node", listBuilder.build(), optional.get());
152 private static void verifyNode(DOMStoreReadTransaction readTx, YangInstanceIdentifier path, NormalizedNode<?, ?> expNode)
154 Optional<NormalizedNode<?, ?>> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
155 assertEquals("isPresent", true, optional.isPresent());
156 assertEquals("Data node", expNode, optional.get());
159 private static void verifyExists(DOMStoreReadTransaction readTx, YangInstanceIdentifier path) throws Exception {
160 Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS);
161 assertEquals("exists", true, exists);
165 public void testWriteTransactionWithSingleShard() throws Exception {
166 String testName = "testWriteTransactionWithSingleShard";
167 initDatastores(testName);
169 String followerCarShardName = "member-2-shard-cars-" + testName;
170 InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class );
172 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
173 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
175 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
176 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
178 MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
179 YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
180 writeTx.merge(car1Path, car1);
182 MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
183 YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
184 writeTx.merge(car2Path, car2);
186 followerTestKit.doCommit(writeTx.ready());
188 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
190 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
194 writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
196 writeTx.delete(car1Path);
198 followerTestKit.doCommit(writeTx.ready());
200 verifyExists(followerDistributedDataStore.newReadOnlyTransaction(), car2Path);
202 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
204 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car2);
206 // Re-instate the follower member 2 as a single-node to verify replication and recovery.
208 InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
210 JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
211 JavaTestKit.shutdownActorSystem(followerSystem, null, true);
213 ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2"));
215 DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder).
216 setupDistributedDataStore(testName, "module-shards-member2", true, SHARD_NAMES);
218 verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
220 JavaTestKit.shutdownActorSystem(newSystem);
224 public void testReadWriteTransactionWithSingleShard() throws Exception {
225 initDatastores("testReadWriteTransactionWithSingleShard");
227 DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
228 assertNotNull("newReadWriteTransaction returned null", rwTx);
230 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
231 rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
233 MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
234 rwTx.merge(CarsModel.newCarPath("optima"), car1);
236 verifyCars(rwTx, car1);
238 MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
239 YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
240 rwTx.merge(car2Path, car2);
242 verifyExists(rwTx, car2Path);
244 followerTestKit.doCommit(rwTx.ready());
246 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
250 public void testWriteTransactionWithMultipleShards() throws Exception {
251 initDatastores("testWriteTransactionWithMultipleShards");
253 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
254 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
256 YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
257 NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
258 writeTx.write(carsPath, carsNode);
260 YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
261 NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
262 writeTx.write(peoplePath, peopleNode);
264 followerTestKit.doCommit(writeTx.ready());
266 DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
268 verifyNode(readTx, carsPath, carsNode);
269 verifyNode(readTx, peoplePath, peopleNode);
273 public void testReadWriteTransactionWithMultipleShards() throws Exception {
274 initDatastores("testReadWriteTransactionWithMultipleShards");
276 DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
277 assertNotNull("newReadWriteTransaction returned null", rwTx);
279 YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
280 NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
281 rwTx.write(carsPath, carsNode);
283 YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
284 NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
285 rwTx.write(peoplePath, peopleNode);
287 followerTestKit.doCommit(rwTx.ready());
289 DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
291 verifyNode(readTx, carsPath, carsNode);
292 verifyNode(readTx, peoplePath, peopleNode);
296 public void testTransactionChainWithSingleShard() throws Exception {
297 initDatastores("testTransactionChainWithSingleShard");
299 DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
301 // Add the top-level cars container with write-only.
303 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
304 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
306 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
310 // Verify the top-level cars container with read-only.
312 verifyNode(txChain.newReadOnlyTransaction(), CarsModel.BASE_PATH, CarsModel.emptyContainer());
314 // Perform car operations with read-write.
316 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
318 verifyNode(rwTx, CarsModel.BASE_PATH, CarsModel.emptyContainer());
320 rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
322 MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
323 YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
324 rwTx.write(car1Path, car1);
326 verifyExists(rwTx, car1Path);
328 verifyCars(rwTx, car1);
330 MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
331 rwTx.merge(CarsModel.newCarPath("sportage"), car2);
333 rwTx.delete(car1Path);
335 followerTestKit.doCommit(rwTx.ready());
339 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
343 public void testTransactionChainWithMultipleShards() throws Exception{
344 initDatastores("testTransactionChainWithMultipleShards");
346 DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
348 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
349 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
351 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
352 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
354 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
355 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
357 followerTestKit.doCommit(writeTx.ready());
359 DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
361 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
362 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
363 readWriteTx.write(carPath, car);
365 MapEntryNode person = PeopleModel.newPersonEntry("jack");
366 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
367 readWriteTx.merge(personPath, person);
369 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
370 assertEquals("isPresent", true, optional.isPresent());
371 assertEquals("Data node", car, optional.get());
373 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
374 assertEquals("isPresent", true, optional.isPresent());
375 assertEquals("Data node", person, optional.get());
377 DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
379 writeTx = txChain.newWriteOnlyTransaction();
381 writeTx.delete(personPath);
383 DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
385 followerTestKit.doCommit(cohort2);
386 followerTestKit.doCommit(cohort3);
390 DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
391 verifyCars(readTx, car);
393 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
394 assertEquals("isPresent", false, optional.isPresent());
398 public void testChainedTransactionFailureWithSingleShard() throws Exception {
399 initDatastores("testChainedTransactionFailureWithSingleShard");
401 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
402 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
403 LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
404 MoreExecutors.directExecutor());
406 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
407 DOMTransactionChain txChain = broker.createTransactionChain(listener);
409 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
411 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
412 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
413 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
415 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
418 writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
419 fail("Expected TransactionCommitFailedException");
420 } catch (TransactionCommitFailedException e) {
424 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
431 public void testChainedTransactionFailureWithMultipleShards() throws Exception {
432 initDatastores("testChainedTransactionFailureWithMultipleShards");
434 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
435 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
436 LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
437 MoreExecutors.directExecutor());
439 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
440 DOMTransactionChain txChain = broker.createTransactionChain(listener);
442 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
444 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
446 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
447 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
448 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
450 // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
451 // done for put for performance reasons.
452 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
455 writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
456 fail("Expected TransactionCommitFailedException");
457 } catch (TransactionCommitFailedException e) {
461 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
468 public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
469 String testName = "testSingleShardTransactionsWithLeaderChanges";
470 initDatastores(testName);
472 String followerCarShardName = "member-2-shard-cars-" + testName;
473 InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class );
475 // Write top-level car container from the follower so it uses a remote Tx.
477 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
479 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
480 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
482 followerTestKit.doCommit(writeTx.ready());
484 InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
486 // Switch the leader to the follower
488 followerDatastoreContextBuilder.shardElectionTimeoutFactor(1);
489 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
491 JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
493 followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES);
495 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
496 Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS);
498 DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder().
499 shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
500 IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
501 newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG_2, false, SHARD_NAMES);
503 followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES);
505 // Write a car entry to the new leader - should switch to local Tx
507 writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
509 MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
510 YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
511 writeTx.merge(car1Path, car1);
513 followerTestKit.doCommit(writeTx.ready());
515 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1);
519 public void testReadyLocalTransactionForwardedToLeader() throws Exception {
520 initDatastores("testReadyLocalTransactionForwardedToLeader");
521 followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
523 Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars");
524 assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
526 TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
527 dataTree.setSchemaContext(SchemaContextHelper.full());
528 DataTreeModification modification = dataTree.takeSnapshot().newModification();
530 new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
531 new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
533 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
534 new WriteModification(CarsModel.newCarPath("optima"), car).apply(modification);
536 String transactionID = "tx-1";
537 ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(transactionID , modification, true);
539 carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
540 Object resp = followerTestKit.expectMsgClass(Object.class);
541 if(resp instanceof akka.actor.Status.Failure) {
542 throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
545 assertTrue("Expected response of type " + CommitTransactionReply.SERIALIZABLE_CLASS,
546 CommitTransactionReply.SERIALIZABLE_CLASS.equals(resp.getClass()));
548 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car);
551 @Test(expected=NoShardLeaderException.class)
552 public void testTransactionWithIsolatedLeader() throws Throwable {
553 leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(300);
554 String testName = "testTransactionWithIsolatedLeader";
555 initDatastores(testName);
557 JavaTestKit.shutdownActorSystem(followerSystem, null, true);
559 Uninterruptibles.sleepUninterruptibly(leaderDistributedDataStore.getActorContext().getDatastoreContext()
560 .getShardRaftConfig().getElectionTimeOutInterval().toMillis() * 3, TimeUnit.MILLISECONDS);
562 DOMStoreWriteTransaction writeTx = leaderDistributedDataStore.newWriteOnlyTransaction();
563 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
566 followerTestKit.doCommit(writeTx.ready());
567 } catch (ExecutionException e) {
572 @Test(expected=AskTimeoutException.class)
573 public void testTransactionWithShardLeaderNotResponding() throws Throwable {
574 followerDatastoreContextBuilder.shardElectionTimeoutFactor(30);
575 initDatastores("testTransactionWithShardLeaderNotResponding");
577 // Do an initial read to get the primary shard info cached.
579 DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
580 readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
582 // Shutdown the leader and try to create a new tx.
584 JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
586 followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1);
587 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
589 DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
591 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
594 followerTestKit.doCommit(rwTx.ready());
595 } catch (ExecutionException e) {
596 assertTrue("Expected ShardLeaderNotRespondingException cause. Actual: " + e.getCause(),
597 e.getCause() instanceof ShardLeaderNotRespondingException);
598 assertNotNull("Expected a nested cause", e.getCause().getCause());
599 throw e.getCause().getCause();
603 @Test(expected=NoShardLeaderException.class)
604 public void testTransactionWithCreateTxFailureDueToNoLeader() throws Throwable {
605 initDatastores("testTransactionWithCreateTxFailureDueToNoLeader");
607 // Do an initial read to get the primary shard info cached.
609 DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
610 readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
612 // Shutdown the leader and try to create a new tx.
614 JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
616 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
618 followerDatastoreContextBuilder.operationTimeoutInMillis(10).shardElectionTimeoutFactor(1);
619 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
621 DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
623 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
626 followerTestKit.doCommit(rwTx.ready());
627 } catch (ExecutionException e) {
633 public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception {
634 followerDatastoreContextBuilder.shardElectionTimeoutFactor(30);
635 String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx";
636 initDatastores(testName, MODULE_SHARDS_CONFIG_3);
638 DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder().
639 shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
640 IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder);
641 follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG_3, false, SHARD_NAMES);
643 // Do an initial read to get the primary shard info cached.
645 DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
646 readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
648 // Shutdown the leader and try to create a new tx.
650 JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
652 followerDatastoreContextBuilder.operationTimeoutInMillis(500);
653 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
655 DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
657 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
659 followerTestKit.doCommit(rwTx.ready());
662 private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) {
663 DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
664 Answer<DatastoreContext> answer = new Answer<DatastoreContext>() {
666 public DatastoreContext answer(InvocationOnMock invocation) {
667 return builder.build();
670 Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
671 Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
672 dataStore.onDatastoreContextUpdated(mockContextFactory);