2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorSystem;
20 import akka.actor.Address;
21 import akka.actor.AddressFromURIString;
22 import akka.cluster.Cluster;
23 import akka.testkit.JavaTestKit;
24 import com.google.common.base.Optional;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.CheckedFuture;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.typesafe.config.ConfigFactory;
31 import java.io.IOException;
32 import java.math.BigInteger;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.AfterClass;
43 import org.junit.BeforeClass;
44 import org.junit.Test;
45 import org.mockito.Mockito;
46 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
48 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
49 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
50 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
51 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
52 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
53 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
54 import org.opendaylight.controller.cluster.raft.Snapshot;
55 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
56 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
57 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
58 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
59 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
60 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
61 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
62 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
63 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
64 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
65 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
66 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
67 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
68 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
69 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
70 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
71 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
72 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
73 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
74 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
75 import org.opendaylight.yangtools.concepts.ListenerRegistration;
76 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
77 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
78 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
79 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
80 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
81 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
82 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
83 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
85 public class DistributedDataStoreIntegrationTest {
87 private static ActorSystem system;
89 private final DatastoreContext.Builder datastoreContextBuilder =
90 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
93 public static void setUpClass() throws IOException {
94 system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
95 Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
96 Cluster.get(system).join(member1Address);
100 public static void tearDownClass() throws IOException {
101 JavaTestKit.shutdownActorSystem(system);
105 protected ActorSystem getSystem() {
110 public void testWriteTransactionWithSingleShard() throws Exception{
111 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
112 DistributedDataStore dataStore =
113 setupDistributedDataStore("transactionIntegrationTest", "test-1");
115 testWriteTransaction(dataStore, TestModel.TEST_PATH,
116 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
118 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
119 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
126 public void testWriteTransactionWithMultipleShards() throws Exception{
127 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
128 DistributedDataStore dataStore =
129 setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
131 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
132 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
134 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
135 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
137 doCommit(writeTx.ready());
139 writeTx = dataStore.newWriteOnlyTransaction();
141 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
142 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
144 doCommit(writeTx.ready());
146 writeTx = dataStore.newWriteOnlyTransaction();
148 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
149 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
150 writeTx.write(carPath, car);
152 MapEntryNode person = PeopleModel.newPersonEntry("jack");
153 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
154 writeTx.write(personPath, person);
156 doCommit(writeTx.ready());
158 // Verify the data in the store
160 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
162 Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
163 assertEquals("isPresent", true, optional.isPresent());
164 assertEquals("Data node", car, optional.get());
166 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
167 assertEquals("isPresent", true, optional.isPresent());
168 assertEquals("Data node", person, optional.get());
175 public void testReadWriteTransactionWithSingleShard() throws Exception{
176 System.setProperty("shard.persistent", "true");
177 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
178 DistributedDataStore dataStore =
179 setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1");
181 // 1. Create a read-write Tx
183 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
184 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
186 // 2. Write some data
188 YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
189 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
190 readWriteTx.write(nodePath, nodeToWrite );
192 // 3. Read the data from Tx
194 Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
195 assertEquals("exists", true, exists);
197 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
198 assertEquals("isPresent", true, optional.isPresent());
199 assertEquals("Data node", nodeToWrite, optional.get());
201 // 4. Ready the Tx for commit
203 DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
209 // 6. Verify the data in the store
211 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
213 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
214 assertEquals("isPresent", true, optional.isPresent());
215 assertEquals("Data node", nodeToWrite, optional.get());
222 public void testReadWriteTransactionWithMultipleShards() throws Exception{
223 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
224 DistributedDataStore dataStore =
225 setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1");
227 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
228 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
230 readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
231 readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
233 doCommit(readWriteTx.ready());
235 readWriteTx = dataStore.newReadWriteTransaction();
237 readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
238 readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
240 doCommit(readWriteTx.ready());
242 readWriteTx = dataStore.newReadWriteTransaction();
244 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
245 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
246 readWriteTx.write(carPath, car);
248 MapEntryNode person = PeopleModel.newPersonEntry("jack");
249 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
250 readWriteTx.write(personPath, person);
252 Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
253 assertEquals("exists", true, exists);
255 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
256 assertEquals("isPresent", true, optional.isPresent());
257 assertEquals("Data node", car, optional.get());
259 doCommit(readWriteTx.ready());
261 // Verify the data in the store
263 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
265 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
266 assertEquals("isPresent", true, optional.isPresent());
267 assertEquals("Data node", car, optional.get());
269 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
270 assertEquals("isPresent", true, optional.isPresent());
271 assertEquals("Data node", person, optional.get());
278 public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
279 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
280 DistributedDataStore dataStore = setupDistributedDataStore(
281 "testSingleTransactionsWritesInQuickSuccession", "cars-1");
283 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
285 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
286 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
287 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
288 doCommit(writeTx.ready());
290 writeTx = txChain.newWriteOnlyTransaction();
293 for(int i = 0; i < nCars; i++) {
294 writeTx.write(CarsModel.newCarPath("car" + i),
295 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
298 doCommit(writeTx.ready());
300 Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
301 CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
302 assertEquals("isPresent", true, optional.isPresent());
303 assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
309 private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
310 final boolean writeOnly) throws Exception {
311 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
312 String shardName = "test-1";
314 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
315 // initialized until we create and submit the write the Tx.
316 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
317 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
318 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
320 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
322 // Create the write Tx
324 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
325 dataStore.newReadWriteTransaction();
326 assertNotNull("newReadWriteTransaction returned null", writeTx);
328 // Do some modification operations and ready the Tx on a separate thread.
330 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
331 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
332 TestModel.ID_QNAME, 1).build();
334 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
335 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
336 final CountDownLatch txReady = new CountDownLatch(1);
337 Thread txThread = new Thread() {
341 writeTx.write(TestModel.TEST_PATH,
342 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
344 writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
345 TestModel.OUTER_LIST_QNAME).build());
347 writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
348 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
350 writeTx.delete(listEntryPath);
352 txCohort.set(writeTx.ready());
353 } catch(Exception e) {
364 // Wait for the Tx operations to complete.
366 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
367 if(caughtEx.get() != null) {
368 throw caughtEx.get();
371 assertEquals("Tx ready", true, done);
373 // At this point the Tx operations should be waiting for the shard to initialize so
374 // trigger the latch to let the shard recovery to continue.
376 blockRecoveryLatch.countDown();
378 // Wait for the Tx commit to complete.
380 doCommit(txCohort.get());
382 // Verify the data in the store
384 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
386 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
387 get(5, TimeUnit.SECONDS);
388 assertEquals("isPresent", true, optional.isPresent());
390 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
391 assertEquals("isPresent", true, optional.isPresent());
393 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
394 assertEquals("isPresent", false, optional.isPresent());
401 public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
402 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
403 testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
407 public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
408 testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
412 public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
413 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
414 String testName = "testTransactionReadsWithShardNotInitiallyReady";
415 String shardName = "test-1";
417 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
418 // initialized until we create the Tx.
419 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
420 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
421 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
423 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
425 // Create the read-write Tx
427 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
428 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
430 // Do some reads on the Tx on a separate thread.
432 final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
433 new AtomicReference<>();
434 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
435 txReadFuture = new AtomicReference<>();
436 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
437 final CountDownLatch txReadsDone = new CountDownLatch(1);
438 Thread txThread = new Thread() {
442 readWriteTx.write(TestModel.TEST_PATH,
443 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
445 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
447 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
448 } catch(Exception e) {
452 txReadsDone.countDown();
459 // Wait for the Tx operations to complete.
461 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
462 if(caughtEx.get() != null) {
463 throw caughtEx.get();
466 assertEquals("Tx reads done", true, done);
468 // At this point the Tx operations should be waiting for the shard to initialize so
469 // trigger the latch to let the shard recovery to continue.
471 blockRecoveryLatch.countDown();
473 // Wait for the reads to complete and verify.
475 assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
476 assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
484 @Test(expected=NotInitializedException.class)
485 public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
486 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
487 String testName = "testTransactionCommitFailureWithShardNotInitialized";
488 String shardName = "test-1";
490 // Set the shard initialization timeout low for the test.
492 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
494 // Setup the InMemoryJournal to block shard recovery indefinitely.
496 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
497 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
498 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
500 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
502 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
504 // Create the write Tx
506 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
507 assertNotNull("newReadWriteTransaction returned null", writeTx);
509 // Do some modifications and ready the Tx on a separate thread.
511 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
512 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
513 final CountDownLatch txReady = new CountDownLatch(1);
514 Thread txThread = new Thread() {
518 writeTx.write(TestModel.TEST_PATH,
519 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
521 txCohort.set(writeTx.ready());
522 } catch(Exception e) {
533 // Wait for the Tx operations to complete.
535 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
536 if(caughtEx.get() != null) {
537 throw caughtEx.get();
540 assertEquals("Tx ready", true, done);
542 // Wait for the commit to complete. Since the shard never initialized, the Tx should
543 // have timed out and throw an appropriate exception cause.
546 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
547 } catch(ExecutionException e) {
550 blockRecoveryLatch.countDown();
556 @Test(expected=NotInitializedException.class)
557 public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
558 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
559 String testName = "testTransactionReadFailureWithShardNotInitialized";
560 String shardName = "test-1";
562 // Set the shard initialization timeout low for the test.
564 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
566 // Setup the InMemoryJournal to block shard recovery indefinitely.
568 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
569 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
570 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
572 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
574 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
576 // Create the read-write Tx
578 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
579 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
581 // Do a read on the Tx on a separate thread.
583 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
584 txReadFuture = new AtomicReference<>();
585 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
586 final CountDownLatch txReadDone = new CountDownLatch(1);
587 Thread txThread = new Thread() {
591 readWriteTx.write(TestModel.TEST_PATH,
592 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
594 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
597 } catch(Exception e) {
601 txReadDone.countDown();
608 // Wait for the Tx operations to complete.
610 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
611 if(caughtEx.get() != null) {
612 throw caughtEx.get();
615 assertEquals("Tx read done", true, done);
617 // Wait for the read to complete. Since the shard never initialized, the Tx should
618 // have timed out and throw an appropriate exception cause.
621 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
622 } catch(ReadFailedException e) {
625 blockRecoveryLatch.countDown();
631 private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName) throws Throwable {
632 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
633 String shardName = "default";
635 // We don't want the shard to become the leader so prevent shard elections.
636 datastoreContextBuilder.customRaftPolicyImplementation(
637 "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
639 // The ShardManager uses the election timeout for FindPrimary so reset it low so it will timeout quickly.
640 datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1).
641 shardInitializationTimeout(200, TimeUnit.MILLISECONDS);
643 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
645 Object result = dataStore.getActorContext().executeOperation(dataStore.getActorContext().getShardManager(),
646 new FindLocalShard(shardName, true));
647 assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
649 // Create the write Tx.
651 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
652 dataStore.newReadWriteTransaction();
653 assertNotNull("newReadWriteTransaction returned null", writeTx);
655 // Do some modifications and ready the Tx on a separate thread.
657 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
658 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
659 final CountDownLatch txReady = new CountDownLatch(1);
660 Thread txThread = new Thread() {
664 writeTx.write(TestModel.JUNK_PATH,
665 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
667 txCohort.set(writeTx.ready());
668 } catch(Exception e) {
679 // Wait for the Tx operations to complete.
681 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
682 if(caughtEx.get() != null) {
683 throw caughtEx.get();
686 assertEquals("Tx ready", true, done);
688 // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
689 // should have timed out and throw an appropriate exception cause.
692 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
693 } catch(ExecutionException e) {
701 @Test(expected=NoShardLeaderException.class)
702 public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
703 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
704 testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
707 @Test(expected=NoShardLeaderException.class)
708 public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
709 testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
713 public void testTransactionAbort() throws Exception{
714 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
715 DistributedDataStore dataStore =
716 setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
718 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
719 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
721 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
723 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
725 cohort.canCommit().get(5, TimeUnit.SECONDS);
727 cohort.abort().get(5, TimeUnit.SECONDS);
729 testWriteTransaction(dataStore, TestModel.TEST_PATH,
730 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
737 public void testTransactionChainWithSingleShard() throws Exception{
738 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
739 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1");
741 // 1. Create a Tx chain and write-only Tx
743 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
745 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
746 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
748 // 2. Write some data
750 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
751 writeTx.write(TestModel.TEST_PATH, testNode);
753 // 3. Ready the Tx for commit
755 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
757 // 4. Commit the Tx on another thread that first waits for the second read Tx.
759 final CountDownLatch continueCommit1 = new CountDownLatch(1);
760 final CountDownLatch commit1Done = new CountDownLatch(1);
761 final AtomicReference<Exception> commit1Error = new AtomicReference<>();
766 continueCommit1.await();
768 } catch (Exception e) {
771 commit1Done.countDown();
776 // 5. Create a new read Tx from the chain to read and verify the data from the first
777 // Tx is visible after being readied.
779 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
780 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
781 assertEquals("isPresent", true, optional.isPresent());
782 assertEquals("Data node", testNode, optional.get());
784 // 6. Create a new RW Tx from the chain, write more data, and ready it
786 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
787 MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
788 rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
790 DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
792 // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
793 // verify it is visible.
795 readTx = txChain.newReadWriteTransaction();
796 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
797 assertEquals("isPresent", true, optional.isPresent());
798 assertEquals("Data node", outerNode, optional.get());
800 // 8. Wait for the 2 commits to complete and close the chain.
802 continueCommit1.countDown();
803 Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
805 if(commit1Error.get() != null) {
806 throw commit1Error.get();
813 // 9. Create a new read Tx from the data store and verify committed data.
815 readTx = dataStore.newReadOnlyTransaction();
816 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
817 assertEquals("isPresent", true, optional.isPresent());
818 assertEquals("Data node", outerNode, optional.get());
825 public void testTransactionChainWithMultipleShards() throws Exception{
826 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
827 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
828 "cars-1", "people-1");
830 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
832 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
833 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
835 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
836 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
838 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
839 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
841 DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
843 DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
845 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
846 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
847 readWriteTx.write(carPath, car);
849 MapEntryNode person = PeopleModel.newPersonEntry("jack");
850 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
851 readWriteTx.merge(personPath, person);
853 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
854 assertEquals("isPresent", true, optional.isPresent());
855 assertEquals("Data node", car, optional.get());
857 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
858 assertEquals("isPresent", true, optional.isPresent());
859 assertEquals("Data node", person, optional.get());
861 DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
863 writeTx = txChain.newWriteOnlyTransaction();
865 writeTx.delete(carPath);
867 DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
869 ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
870 ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
872 doCommit(canCommit1, cohort1);
873 doCommit(canCommit2, cohort2);
878 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
880 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
881 assertEquals("isPresent", false, optional.isPresent());
883 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
884 assertEquals("isPresent", true, optional.isPresent());
885 assertEquals("Data node", person, optional.get());
892 public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
893 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
894 DistributedDataStore dataStore = setupDistributedDataStore(
895 "testCreateChainedTransactionsInQuickSuccession", "cars-1");
897 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
898 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
899 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
901 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
902 DOMTransactionChain txChain = broker.createTransactionChain(listener);
904 List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
906 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
907 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
908 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
909 futures.add(writeTx.submit());
912 for(int i = 0; i < nCars; i++) {
913 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
915 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
916 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
918 futures.add(rwTx.submit());
921 for(CheckedFuture<Void, TransactionCommitFailedException> f: futures) {
925 Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
926 LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
927 assertEquals("isPresent", true, optional.isPresent());
928 assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
939 public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
940 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
941 DistributedDataStore dataStore = setupDistributedDataStore(
942 "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
944 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
946 DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
950 DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
952 Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
953 assertEquals("isPresent", false, optional.isPresent());
962 public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
963 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
964 DistributedDataStore dataStore = setupDistributedDataStore(
965 "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
967 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
969 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
970 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
972 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
974 // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
977 assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
982 public void testCreateChainedTransactionAfterClose() throws Throwable {
983 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
984 DistributedDataStore dataStore = setupDistributedDataStore(
985 "testCreateChainedTransactionAfterClose", "test-1");
987 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
991 // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
993 assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
998 public void testChainWithReadOnlyTxAfterPreviousReady() throws Throwable {
999 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1000 DistributedDataStore dataStore = setupDistributedDataStore(
1001 "testChainWithReadOnlyTxAfterPreviousReady", "test-1");
1003 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
1005 // Create a write tx and submit.
1007 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1008 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1009 DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
1011 // Create read-only tx's and issue a read.
1013 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture1 =
1014 txChain.newReadOnlyTransaction().read(TestModel.TEST_PATH);
1016 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture2 =
1017 txChain.newReadOnlyTransaction().read(TestModel.TEST_PATH);
1019 // Create another write tx and issue the write.
1021 DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction();
1022 writeTx2.write(TestModel.OUTER_LIST_PATH,
1023 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1025 // Ensure the reads succeed.
1027 assertEquals("isPresent", true, readFuture1.checkedGet(5, TimeUnit.SECONDS).isPresent());
1028 assertEquals("isPresent", true, readFuture2.checkedGet(5, TimeUnit.SECONDS).isPresent());
1030 // Ensure the writes succeed.
1032 DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
1037 assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH).
1038 checkedGet(5, TimeUnit.SECONDS).isPresent());
1043 public void testChainedTransactionFailureWithSingleShard() throws Exception{
1044 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1045 DistributedDataStore dataStore = setupDistributedDataStore(
1046 "testChainedTransactionFailureWithSingleShard", "cars-1");
1048 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1049 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
1050 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
1052 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1053 DOMTransactionChain txChain = broker.createTransactionChain(listener);
1055 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
1057 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1058 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1059 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1061 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1064 rwTx.submit().checkedGet(5, TimeUnit.SECONDS);
1065 fail("Expected TransactionCommitFailedException");
1066 } catch (TransactionCommitFailedException e) {
1070 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class));
1079 public void testChainedTransactionFailureWithMultipleShards() throws Exception{
1080 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1081 DistributedDataStore dataStore = setupDistributedDataStore(
1082 "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1");
1084 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1085 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
1086 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
1088 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1089 DOMTransactionChain txChain = broker.createTransactionChain(listener);
1091 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1093 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
1095 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1096 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1097 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1099 // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
1100 // done for put for performance reasons.
1101 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1104 writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1105 fail("Expected TransactionCommitFailedException");
1106 } catch (TransactionCommitFailedException e) {
1110 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
1119 public void testChangeListenerRegistration() throws Exception{
1120 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1121 DistributedDataStore dataStore =
1122 setupDistributedDataStore("testChangeListenerRegistration", "test-1");
1124 testWriteTransaction(dataStore, TestModel.TEST_PATH,
1125 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1127 MockDataChangeListener listener = new MockDataChangeListener(1);
1129 ListenerRegistration<MockDataChangeListener>
1130 listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
1131 DataChangeScope.SUBTREE);
1133 assertNotNull("registerChangeListener returned null", listenerReg);
1135 // Wait for the initial notification
1137 listener.waitForChangeEvents(TestModel.TEST_PATH);
1143 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1144 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1146 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1147 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1148 testWriteTransaction(dataStore, listPath,
1149 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1151 // Wait for the 2 updates.
1153 listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1155 listenerReg.close();
1157 testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1158 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1159 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1161 listener.expectNoMoreChanges("Received unexpected change after close");
1168 public void testRestoreFromDatastoreSnapshot() throws Exception{
1169 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1170 String name = "transactionIntegrationTest";
1172 ContainerNode carsNode = CarsModel.newCarsNode(CarsModel.newCarsMapNode(
1173 CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
1174 CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
1176 ShardDataTree dataTree = new ShardDataTree(SchemaContextHelper.full(), TreeType.OPERATIONAL);
1177 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
1178 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree.getDataTree(),
1179 YangInstanceIdentifier.builder().build());
1181 Snapshot carsSnapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(root),
1182 Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
1184 NormalizedNode<?, ?> peopleNode = PeopleModel.create();
1185 dataTree = new ShardDataTree(SchemaContextHelper.full(), TreeType.OPERATIONAL);
1186 AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
1187 root = AbstractShardTest.readStore(dataTree.getDataTree(), YangInstanceIdentifier.builder().build());
1189 Snapshot peopleSnapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(root),
1190 Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
1192 restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
1193 new DatastoreSnapshot.ShardSnapshot("cars",
1194 org.apache.commons.lang3.SerializationUtils.serialize(carsSnapshot)),
1195 new DatastoreSnapshot.ShardSnapshot("people",
1196 org.apache.commons.lang3.SerializationUtils.serialize(peopleSnapshot))));
1198 DistributedDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
1199 true, "cars", "people");
1201 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
1203 Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1204 assertEquals("isPresent", true, optional.isPresent());
1205 assertEquals("Data node", carsNode, optional.get());
1207 optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1208 assertEquals("isPresent", true, optional.isPresent());
1209 assertEquals("Data node", peopleNode, optional.get());