2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorSystem;
20 import akka.actor.Address;
21 import akka.actor.AddressFromURIString;
22 import akka.cluster.Cluster;
23 import akka.testkit.JavaTestKit;
24 import com.google.common.base.Optional;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.CheckedFuture;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.typesafe.config.ConfigFactory;
31 import java.io.IOException;
32 import java.math.BigInteger;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.AfterClass;
43 import org.junit.BeforeClass;
44 import org.junit.Test;
45 import org.mockito.Mockito;
46 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
49 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
50 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
51 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
52 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
53 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
54 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
55 import org.opendaylight.controller.cluster.raft.Snapshot;
56 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
57 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
58 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
59 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
60 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
61 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
62 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
63 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
64 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
65 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
66 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
67 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
68 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
69 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
70 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
71 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
72 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
73 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
74 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
75 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
76 import org.opendaylight.yangtools.concepts.ListenerRegistration;
77 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
78 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
79 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
80 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
81 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
82 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
83 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
84 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
86 public class DistributedDataStoreIntegrationTest {
88 private static ActorSystem system;
90 private final DatastoreContext.Builder datastoreContextBuilder =
91 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
94 public static void setUpClass() throws IOException {
95 system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
96 Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
97 Cluster.get(system).join(member1Address);
101 public static void tearDownClass() throws IOException {
102 JavaTestKit.shutdownActorSystem(system);
106 protected ActorSystem getSystem() {
111 public void testWriteTransactionWithSingleShard() throws Exception{
112 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
113 DistributedDataStore dataStore =
114 setupDistributedDataStore("transactionIntegrationTest", "test-1");
116 testWriteTransaction(dataStore, TestModel.TEST_PATH,
117 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
119 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
120 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
127 public void testWriteTransactionWithMultipleShards() throws Exception{
128 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
129 DistributedDataStore dataStore =
130 setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
132 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
133 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
135 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
136 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
138 doCommit(writeTx.ready());
140 writeTx = dataStore.newWriteOnlyTransaction();
142 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
143 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
145 doCommit(writeTx.ready());
147 writeTx = dataStore.newWriteOnlyTransaction();
149 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
150 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
151 writeTx.write(carPath, car);
153 MapEntryNode person = PeopleModel.newPersonEntry("jack");
154 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
155 writeTx.write(personPath, person);
157 doCommit(writeTx.ready());
159 // Verify the data in the store
161 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
163 Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
164 assertEquals("isPresent", true, optional.isPresent());
165 assertEquals("Data node", car, optional.get());
167 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
168 assertEquals("isPresent", true, optional.isPresent());
169 assertEquals("Data node", person, optional.get());
176 public void testReadWriteTransactionWithSingleShard() throws Exception{
177 System.setProperty("shard.persistent", "true");
178 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
179 DistributedDataStore dataStore =
180 setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1");
182 // 1. Create a read-write Tx
184 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
185 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
187 // 2. Write some data
189 YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
190 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
191 readWriteTx.write(nodePath, nodeToWrite );
193 // 3. Read the data from Tx
195 Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
196 assertEquals("exists", true, exists);
198 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
199 assertEquals("isPresent", true, optional.isPresent());
200 assertEquals("Data node", nodeToWrite, optional.get());
202 // 4. Ready the Tx for commit
204 DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
210 // 6. Verify the data in the store
212 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
214 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
215 assertEquals("isPresent", true, optional.isPresent());
216 assertEquals("Data node", nodeToWrite, optional.get());
223 public void testReadWriteTransactionWithMultipleShards() throws Exception{
224 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
225 DistributedDataStore dataStore =
226 setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1");
228 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
229 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
231 readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
232 readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
234 doCommit(readWriteTx.ready());
236 readWriteTx = dataStore.newReadWriteTransaction();
238 readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
239 readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
241 doCommit(readWriteTx.ready());
243 readWriteTx = dataStore.newReadWriteTransaction();
245 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
246 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
247 readWriteTx.write(carPath, car);
249 MapEntryNode person = PeopleModel.newPersonEntry("jack");
250 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
251 readWriteTx.write(personPath, person);
253 Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
254 assertEquals("exists", true, exists);
256 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
257 assertEquals("isPresent", true, optional.isPresent());
258 assertEquals("Data node", car, optional.get());
260 doCommit(readWriteTx.ready());
262 // Verify the data in the store
264 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
266 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
267 assertEquals("isPresent", true, optional.isPresent());
268 assertEquals("Data node", car, optional.get());
270 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
271 assertEquals("isPresent", true, optional.isPresent());
272 assertEquals("Data node", person, optional.get());
279 public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
280 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
281 DistributedDataStore dataStore = setupDistributedDataStore(
282 "testSingleTransactionsWritesInQuickSuccession", "cars-1");
284 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
286 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
287 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
288 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
289 doCommit(writeTx.ready());
291 writeTx = txChain.newWriteOnlyTransaction();
294 for(int i = 0; i < nCars; i++) {
295 writeTx.write(CarsModel.newCarPath("car" + i),
296 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
299 doCommit(writeTx.ready());
301 Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
302 CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
303 assertEquals("isPresent", true, optional.isPresent());
304 assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
310 private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
311 final boolean writeOnly) throws Exception {
312 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
313 String shardName = "test-1";
315 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
316 // initialized until we create and submit the write the Tx.
317 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
318 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
319 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
321 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
323 // Create the write Tx
325 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
326 dataStore.newReadWriteTransaction();
327 assertNotNull("newReadWriteTransaction returned null", writeTx);
329 // Do some modification operations and ready the Tx on a separate thread.
331 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
332 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
333 TestModel.ID_QNAME, 1).build();
335 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
336 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
337 final CountDownLatch txReady = new CountDownLatch(1);
338 Thread txThread = new Thread() {
342 writeTx.write(TestModel.TEST_PATH,
343 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
345 writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
346 TestModel.OUTER_LIST_QNAME).build());
348 writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
349 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
351 writeTx.delete(listEntryPath);
353 txCohort.set(writeTx.ready());
354 } catch(Exception e) {
365 // Wait for the Tx operations to complete.
367 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
368 if(caughtEx.get() != null) {
369 throw caughtEx.get();
372 assertEquals("Tx ready", true, done);
374 // At this point the Tx operations should be waiting for the shard to initialize so
375 // trigger the latch to let the shard recovery to continue.
377 blockRecoveryLatch.countDown();
379 // Wait for the Tx commit to complete.
381 doCommit(txCohort.get());
383 // Verify the data in the store
385 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
387 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
388 get(5, TimeUnit.SECONDS);
389 assertEquals("isPresent", true, optional.isPresent());
391 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
392 assertEquals("isPresent", true, optional.isPresent());
394 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
395 assertEquals("isPresent", false, optional.isPresent());
402 public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
403 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
404 testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
408 public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
409 testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
413 public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
414 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
415 String testName = "testTransactionReadsWithShardNotInitiallyReady";
416 String shardName = "test-1";
418 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
419 // initialized until we create the Tx.
420 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
421 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
422 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
424 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
426 // Create the read-write Tx
428 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
429 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
431 // Do some reads on the Tx on a separate thread.
433 final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
434 new AtomicReference<>();
435 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
436 txReadFuture = new AtomicReference<>();
437 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
438 final CountDownLatch txReadsDone = new CountDownLatch(1);
439 Thread txThread = new Thread() {
443 readWriteTx.write(TestModel.TEST_PATH,
444 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
446 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
448 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
449 } catch(Exception e) {
453 txReadsDone.countDown();
460 // Wait for the Tx operations to complete.
462 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
463 if(caughtEx.get() != null) {
464 throw caughtEx.get();
467 assertEquals("Tx reads done", true, done);
469 // At this point the Tx operations should be waiting for the shard to initialize so
470 // trigger the latch to let the shard recovery to continue.
472 blockRecoveryLatch.countDown();
474 // Wait for the reads to complete and verify.
476 assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
477 assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
485 @Test(expected=NotInitializedException.class)
486 public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
487 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
488 String testName = "testTransactionCommitFailureWithShardNotInitialized";
489 String shardName = "test-1";
491 // Set the shard initialization timeout low for the test.
493 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
495 // Setup the InMemoryJournal to block shard recovery indefinitely.
497 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
498 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
499 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
501 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
503 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
505 // Create the write Tx
507 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
508 assertNotNull("newReadWriteTransaction returned null", writeTx);
510 // Do some modifications and ready the Tx on a separate thread.
512 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
513 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
514 final CountDownLatch txReady = new CountDownLatch(1);
515 Thread txThread = new Thread() {
519 writeTx.write(TestModel.TEST_PATH,
520 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
522 txCohort.set(writeTx.ready());
523 } catch(Exception e) {
534 // Wait for the Tx operations to complete.
536 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
537 if(caughtEx.get() != null) {
538 throw caughtEx.get();
541 assertEquals("Tx ready", true, done);
543 // Wait for the commit to complete. Since the shard never initialized, the Tx should
544 // have timed out and throw an appropriate exception cause.
547 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
548 } catch(ExecutionException e) {
551 blockRecoveryLatch.countDown();
557 @Test(expected=NotInitializedException.class)
558 public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
559 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
560 String testName = "testTransactionReadFailureWithShardNotInitialized";
561 String shardName = "test-1";
563 // Set the shard initialization timeout low for the test.
565 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
567 // Setup the InMemoryJournal to block shard recovery indefinitely.
569 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
570 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
571 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
573 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
575 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
577 // Create the read-write Tx
579 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
580 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
582 // Do a read on the Tx on a separate thread.
584 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
585 txReadFuture = new AtomicReference<>();
586 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
587 final CountDownLatch txReadDone = new CountDownLatch(1);
588 Thread txThread = new Thread() {
592 readWriteTx.write(TestModel.TEST_PATH,
593 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
595 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
598 } catch(Exception e) {
602 txReadDone.countDown();
609 // Wait for the Tx operations to complete.
611 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
612 if(caughtEx.get() != null) {
613 throw caughtEx.get();
616 assertEquals("Tx read done", true, done);
618 // Wait for the read to complete. Since the shard never initialized, the Tx should
619 // have timed out and throw an appropriate exception cause.
622 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
623 } catch(ReadFailedException e) {
626 blockRecoveryLatch.countDown();
632 private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName) throws Throwable {
633 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
634 String shardName = "default";
636 // We don't want the shard to become the leader so prevent shard elections.
637 datastoreContextBuilder.customRaftPolicyImplementation(
638 "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
640 // The ShardManager uses the election timeout for FindPrimary so reset it low so it will timeout quickly.
641 datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1).
642 shardInitializationTimeout(200, TimeUnit.MILLISECONDS);
644 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
646 Object result = dataStore.getActorContext().executeOperation(dataStore.getActorContext().getShardManager(),
647 new FindLocalShard(shardName, true));
648 assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
650 // Create the write Tx.
652 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
653 dataStore.newReadWriteTransaction();
654 assertNotNull("newReadWriteTransaction returned null", writeTx);
656 // Do some modifications and ready the Tx on a separate thread.
658 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
659 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
660 final CountDownLatch txReady = new CountDownLatch(1);
661 Thread txThread = new Thread() {
665 writeTx.write(TestModel.JUNK_PATH,
666 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
668 txCohort.set(writeTx.ready());
669 } catch(Exception e) {
680 // Wait for the Tx operations to complete.
682 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
683 if(caughtEx.get() != null) {
684 throw caughtEx.get();
687 assertEquals("Tx ready", true, done);
689 // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
690 // should have timed out and throw an appropriate exception cause.
693 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
694 } catch(ExecutionException e) {
702 @Test(expected=NoShardLeaderException.class)
703 public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
704 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
705 testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
708 @Test(expected=NoShardLeaderException.class)
709 public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
710 testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
714 public void testTransactionAbort() throws Exception{
715 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
716 DistributedDataStore dataStore =
717 setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
719 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
720 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
722 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
724 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
726 cohort.canCommit().get(5, TimeUnit.SECONDS);
728 cohort.abort().get(5, TimeUnit.SECONDS);
730 testWriteTransaction(dataStore, TestModel.TEST_PATH,
731 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
738 public void testTransactionChainWithSingleShard() throws Exception{
739 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
740 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1");
742 // 1. Create a Tx chain and write-only Tx
744 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
746 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
747 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
749 // 2. Write some data
751 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
752 writeTx.write(TestModel.TEST_PATH, testNode);
754 // 3. Ready the Tx for commit
756 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
758 // 4. Commit the Tx on another thread that first waits for the second read Tx.
760 final CountDownLatch continueCommit1 = new CountDownLatch(1);
761 final CountDownLatch commit1Done = new CountDownLatch(1);
762 final AtomicReference<Exception> commit1Error = new AtomicReference<>();
767 continueCommit1.await();
769 } catch (Exception e) {
772 commit1Done.countDown();
777 // 5. Create a new read Tx from the chain to read and verify the data from the first
778 // Tx is visible after being readied.
780 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
781 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
782 assertEquals("isPresent", true, optional.isPresent());
783 assertEquals("Data node", testNode, optional.get());
785 // 6. Create a new RW Tx from the chain, write more data, and ready it
787 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
788 MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
789 rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
791 DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
793 // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
794 // verify it is visible.
796 readTx = txChain.newReadWriteTransaction();
797 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
798 assertEquals("isPresent", true, optional.isPresent());
799 assertEquals("Data node", outerNode, optional.get());
801 // 8. Wait for the 2 commits to complete and close the chain.
803 continueCommit1.countDown();
804 Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
806 if(commit1Error.get() != null) {
807 throw commit1Error.get();
814 // 9. Create a new read Tx from the data store and verify committed data.
816 readTx = dataStore.newReadOnlyTransaction();
817 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
818 assertEquals("isPresent", true, optional.isPresent());
819 assertEquals("Data node", outerNode, optional.get());
826 public void testTransactionChainWithMultipleShards() throws Exception{
827 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
828 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
829 "cars-1", "people-1");
831 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
833 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
834 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
836 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
837 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
839 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
840 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
842 DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
844 DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
846 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
847 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
848 readWriteTx.write(carPath, car);
850 MapEntryNode person = PeopleModel.newPersonEntry("jack");
851 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
852 readWriteTx.merge(personPath, person);
854 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
855 assertEquals("isPresent", true, optional.isPresent());
856 assertEquals("Data node", car, optional.get());
858 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
859 assertEquals("isPresent", true, optional.isPresent());
860 assertEquals("Data node", person, optional.get());
862 DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
864 writeTx = txChain.newWriteOnlyTransaction();
866 writeTx.delete(carPath);
868 DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
870 ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
871 ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
873 doCommit(canCommit1, cohort1);
874 doCommit(canCommit2, cohort2);
879 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
881 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
882 assertEquals("isPresent", false, optional.isPresent());
884 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
885 assertEquals("isPresent", true, optional.isPresent());
886 assertEquals("Data node", person, optional.get());
893 public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
894 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
895 DistributedDataStore dataStore = setupDistributedDataStore(
896 "testCreateChainedTransactionsInQuickSuccession", "cars-1");
898 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
899 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
900 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
902 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
903 DOMTransactionChain txChain = broker.createTransactionChain(listener);
905 List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
907 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
908 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
909 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
910 futures.add(writeTx.submit());
913 for(int i = 0; i < nCars; i++) {
914 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
916 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
917 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
919 futures.add(rwTx.submit());
922 for(CheckedFuture<Void, TransactionCommitFailedException> f: futures) {
926 Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
927 LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
928 assertEquals("isPresent", true, optional.isPresent());
929 assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
940 public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
941 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
942 DistributedDataStore dataStore = setupDistributedDataStore(
943 "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
945 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
947 DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
951 DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
953 Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
954 assertEquals("isPresent", false, optional.isPresent());
963 public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
964 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
965 DistributedDataStore dataStore = setupDistributedDataStore(
966 "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
968 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
970 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
971 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
973 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
975 // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
978 assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
983 public void testCreateChainedTransactionAfterClose() throws Throwable {
984 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
985 DistributedDataStore dataStore = setupDistributedDataStore(
986 "testCreateChainedTransactionAfterClose", "test-1");
988 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
992 // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
994 assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
999 public void testChainWithReadOnlyTxAfterPreviousReady() throws Throwable {
1000 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1001 DistributedDataStore dataStore = setupDistributedDataStore(
1002 "testChainWithReadOnlyTxAfterPreviousReady", "test-1");
1004 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1006 // Create a write tx and submit.
1008 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1009 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1010 DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
1012 // Create read-only tx's and issue a read.
1014 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture1 =
1015 txChain.newReadOnlyTransaction().read(TestModel.TEST_PATH);
1017 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture2 =
1018 txChain.newReadOnlyTransaction().read(TestModel.TEST_PATH);
1020 // Create another write tx and issue the write.
1022 DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction();
1023 writeTx2.write(TestModel.OUTER_LIST_PATH,
1024 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1026 // Ensure the reads succeed.
1028 assertEquals("isPresent", true, readFuture1.checkedGet(5, TimeUnit.SECONDS).isPresent());
1029 assertEquals("isPresent", true, readFuture2.checkedGet(5, TimeUnit.SECONDS).isPresent());
1031 // Ensure the writes succeed.
1033 DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
1038 assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH).
1039 checkedGet(5, TimeUnit.SECONDS).isPresent());
1044 public void testChainedTransactionFailureWithSingleShard() throws Exception{
1045 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1046 DistributedDataStore dataStore = setupDistributedDataStore(
1047 "testChainedTransactionFailureWithSingleShard", "cars-1");
1049 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1050 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
1051 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
1053 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1054 DOMTransactionChain txChain = broker.createTransactionChain(listener);
1056 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
1058 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1059 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1060 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1062 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1065 rwTx.submit().checkedGet(5, TimeUnit.SECONDS);
1066 fail("Expected TransactionCommitFailedException");
1067 } catch (TransactionCommitFailedException e) {
1071 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class));
1080 public void testChainedTransactionFailureWithMultipleShards() throws Exception{
1081 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1082 DistributedDataStore dataStore = setupDistributedDataStore(
1083 "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1");
1085 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1086 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
1087 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
1089 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1090 DOMTransactionChain txChain = broker.createTransactionChain(listener);
1092 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1094 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
1096 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1097 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1098 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1100 // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
1101 // done for put for performance reasons.
1102 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1105 writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1106 fail("Expected TransactionCommitFailedException");
1107 } catch (TransactionCommitFailedException e) {
1111 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
1120 public void testChangeListenerRegistration() throws Exception{
1121 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1122 DistributedDataStore dataStore =
1123 setupDistributedDataStore("testChangeListenerRegistration", "test-1");
1125 testWriteTransaction(dataStore, TestModel.TEST_PATH,
1126 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1128 MockDataChangeListener listener = new MockDataChangeListener(1);
1130 ListenerRegistration<MockDataChangeListener>
1131 listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
1132 DataChangeScope.SUBTREE);
1134 assertNotNull("registerChangeListener returned null", listenerReg);
1136 // Wait for the initial notification
1138 listener.waitForChangeEvents(TestModel.TEST_PATH);
1144 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1145 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1147 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1148 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1149 testWriteTransaction(dataStore, listPath,
1150 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1152 // Wait for the 2 updates.
1154 listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1156 listenerReg.close();
1158 testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1159 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1160 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1162 listener.expectNoMoreChanges("Received unexpected change after close");
1169 public void testRestoreFromDatastoreSnapshot() throws Exception{
1170 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1171 String name = "transactionIntegrationTest";
1173 ContainerNode carsNode = CarsModel.newCarsNode(CarsModel.newCarsMapNode(
1174 CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
1175 CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
1177 ShardDataTree dataTree = new ShardDataTree(SchemaContextHelper.full(), TreeType.OPERATIONAL);
1178 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
1179 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree.getDataTree(),
1180 YangInstanceIdentifier.builder().build());
1182 Snapshot carsSnapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(root),
1183 Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
1185 NormalizedNode<?, ?> peopleNode = PeopleModel.create();
1186 dataTree = new ShardDataTree(SchemaContextHelper.full(), TreeType.OPERATIONAL);
1187 AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
1188 root = AbstractShardTest.readStore(dataTree.getDataTree(), YangInstanceIdentifier.builder().build());
1190 Snapshot peopleSnapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(root),
1191 Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
1193 restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
1194 new DatastoreSnapshot.ShardSnapshot("cars",
1195 org.apache.commons.lang3.SerializationUtils.serialize(carsSnapshot)),
1196 new DatastoreSnapshot.ShardSnapshot("people",
1197 org.apache.commons.lang3.SerializationUtils.serialize(peopleSnapshot))));
1199 DistributedDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
1200 true, "cars", "people");
1202 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
1204 Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1205 assertEquals("isPresent", true, optional.isPresent());
1206 assertEquals("Data node", carsNode, optional.get());
1208 optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1209 assertEquals("isPresent", true, optional.isPresent());
1210 assertEquals("Data node", peopleNode, optional.get());