1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import akka.actor.ActorSystem;
6 import akka.actor.Address;
7 import akka.actor.AddressFromURIString;
8 import akka.cluster.Cluster;
9 import akka.testkit.JavaTestKit;
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.Uninterruptibles;
13 import com.typesafe.config.ConfigFactory;
14 import java.io.IOException;
15 import java.math.BigInteger;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicReference;
22 import org.junit.AfterClass;
23 import org.junit.BeforeClass;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
26 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
27 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
28 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
29 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
30 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
31 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
32 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
33 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
34 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
40 import org.opendaylight.yangtools.concepts.ListenerRegistration;
41 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
42 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
43 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
44 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
45 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
47 public class DistributedDataStoreIntegrationTest {
49 private static ActorSystem system;
51 private final DatastoreContext.Builder datastoreContextBuilder =
52 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
55 public static void setUpClass() throws IOException {
56 system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
57 Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
58 Cluster.get(system).join(member1Address);
62 public static void tearDownClass() throws IOException {
63 JavaTestKit.shutdownActorSystem(system);
67 protected ActorSystem getSystem() {
72 public void testWriteTransactionWithSingleShard() throws Exception{
73 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
74 DistributedDataStore dataStore =
75 setupDistributedDataStore("transactionIntegrationTest", "test-1");
77 testWriteTransaction(dataStore, TestModel.TEST_PATH,
78 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
80 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
81 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
88 public void testWriteTransactionWithMultipleShards() throws Exception{
89 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
90 DistributedDataStore dataStore =
91 setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
93 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
94 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
96 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
97 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
99 doCommit(writeTx.ready());
101 writeTx = dataStore.newWriteOnlyTransaction();
103 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
104 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
106 doCommit(writeTx.ready());
108 writeTx = dataStore.newWriteOnlyTransaction();
110 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
111 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
112 writeTx.write(carPath, car);
114 MapEntryNode person = PeopleModel.newPersonEntry("jack");
115 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
116 writeTx.write(personPath, person);
118 doCommit(writeTx.ready());
120 // Verify the data in the store
122 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
124 Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
125 assertEquals("isPresent", true, optional.isPresent());
126 assertEquals("Data node", car, optional.get());
128 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
129 assertEquals("isPresent", true, optional.isPresent());
130 assertEquals("Data node", person, optional.get());
137 public void testReadWriteTransactionWithSingleShard() throws Exception{
138 System.setProperty("shard.persistent", "true");
139 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
140 DistributedDataStore dataStore =
141 setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1");
143 // 1. Create a read-write Tx
145 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
146 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
148 // 2. Write some data
150 YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
151 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
152 readWriteTx.write(nodePath, nodeToWrite );
154 // 3. Read the data from Tx
156 Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
157 assertEquals("exists", true, exists);
159 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
160 assertEquals("isPresent", true, optional.isPresent());
161 assertEquals("Data node", nodeToWrite, optional.get());
163 // 4. Ready the Tx for commit
165 DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
171 // 6. Verify the data in the store
173 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
175 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
176 assertEquals("isPresent", true, optional.isPresent());
177 assertEquals("Data node", nodeToWrite, optional.get());
184 public void testReadWriteTransactionWithMultipleShards() throws Exception{
185 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
186 DistributedDataStore dataStore =
187 setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1");
189 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
190 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
192 readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
193 readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
195 doCommit(readWriteTx.ready());
197 readWriteTx = dataStore.newReadWriteTransaction();
199 readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
200 readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
202 doCommit(readWriteTx.ready());
204 readWriteTx = dataStore.newReadWriteTransaction();
206 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
207 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
208 readWriteTx.write(carPath, car);
210 MapEntryNode person = PeopleModel.newPersonEntry("jack");
211 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
212 readWriteTx.write(personPath, person);
214 Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
215 assertEquals("exists", true, exists);
217 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
218 assertEquals("isPresent", true, optional.isPresent());
219 assertEquals("Data node", car, optional.get());
221 doCommit(readWriteTx.ready());
223 // Verify the data in the store
225 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
227 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
228 assertEquals("isPresent", true, optional.isPresent());
229 assertEquals("Data node", car, optional.get());
231 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
232 assertEquals("isPresent", true, optional.isPresent());
233 assertEquals("Data node", person, optional.get());
239 private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
240 final boolean writeOnly) throws Exception {
241 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
242 String shardName = "test-1";
244 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
245 // initialized until we create and submit the write the Tx.
246 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
247 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
248 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
250 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
252 // Create the write Tx
254 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
255 dataStore.newReadWriteTransaction();
256 assertNotNull("newReadWriteTransaction returned null", writeTx);
258 // Do some modification operations and ready the Tx on a separate thread.
260 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
261 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
262 TestModel.ID_QNAME, 1).build();
264 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
265 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
266 final CountDownLatch txReady = new CountDownLatch(1);
267 Thread txThread = new Thread() {
271 writeTx.write(TestModel.TEST_PATH,
272 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
274 writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
275 TestModel.OUTER_LIST_QNAME).build());
277 writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
278 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
280 writeTx.delete(listEntryPath);
282 txCohort.set(writeTx.ready());
283 } catch(Exception e) {
294 // Wait for the Tx operations to complete.
296 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
297 if(caughtEx.get() != null) {
298 throw caughtEx.get();
301 assertEquals("Tx ready", true, done);
303 // At this point the Tx operations should be waiting for the shard to initialize so
304 // trigger the latch to let the shard recovery to continue.
306 blockRecoveryLatch.countDown();
308 // Wait for the Tx commit to complete.
310 doCommit(txCohort.get());
312 // Verify the data in the store
314 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
316 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
317 get(5, TimeUnit.SECONDS);
318 assertEquals("isPresent", true, optional.isPresent());
320 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
321 assertEquals("isPresent", true, optional.isPresent());
323 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
324 assertEquals("isPresent", false, optional.isPresent());
331 public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
332 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
333 testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
337 public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
338 testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
342 public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
343 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
344 String testName = "testTransactionReadsWithShardNotInitiallyReady";
345 String shardName = "test-1";
347 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
348 // initialized until we create the Tx.
349 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
350 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
351 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
353 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
355 // Create the read-write Tx
357 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
358 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
360 // Do some reads on the Tx on a separate thread.
362 final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
363 new AtomicReference<>();
364 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
365 txReadFuture = new AtomicReference<>();
366 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
367 final CountDownLatch txReadsDone = new CountDownLatch(1);
368 Thread txThread = new Thread() {
372 readWriteTx.write(TestModel.TEST_PATH,
373 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
375 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
377 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
378 } catch(Exception e) {
382 txReadsDone.countDown();
389 // Wait for the Tx operations to complete.
391 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
392 if(caughtEx.get() != null) {
393 throw caughtEx.get();
396 assertEquals("Tx reads done", true, done);
398 // At this point the Tx operations should be waiting for the shard to initialize so
399 // trigger the latch to let the shard recovery to continue.
401 blockRecoveryLatch.countDown();
403 // Wait for the reads to complete and verify.
405 assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
406 assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
414 @Test(expected=NotInitializedException.class)
415 public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
416 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
417 String testName = "testTransactionCommitFailureWithShardNotInitialized";
418 String shardName = "test-1";
420 // Set the shard initialization timeout low for the test.
422 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
424 // Setup the InMemoryJournal to block shard recovery indefinitely.
426 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
427 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
428 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
430 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
432 // Create the write Tx
434 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
435 assertNotNull("newReadWriteTransaction returned null", writeTx);
437 // Do some modifications and ready the Tx on a separate thread.
439 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
440 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
441 final CountDownLatch txReady = new CountDownLatch(1);
442 Thread txThread = new Thread() {
446 writeTx.write(TestModel.TEST_PATH,
447 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
449 txCohort.set(writeTx.ready());
450 } catch(Exception e) {
461 // Wait for the Tx operations to complete.
463 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
464 if(caughtEx.get() != null) {
465 throw caughtEx.get();
468 assertEquals("Tx ready", true, done);
470 // Wait for the commit to complete. Since the shard never initialized, the Tx should
471 // have timed out and throw an appropriate exception cause.
474 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
475 } catch(ExecutionException e) {
478 blockRecoveryLatch.countDown();
484 @Test(expected=NotInitializedException.class)
485 public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
486 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
487 String testName = "testTransactionReadFailureWithShardNotInitialized";
488 String shardName = "test-1";
490 // Set the shard initialization timeout low for the test.
492 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
494 // Setup the InMemoryJournal to block shard recovery indefinitely.
496 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
497 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
498 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
500 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
502 // Create the read-write Tx
504 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
505 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
507 // Do a read on the Tx on a separate thread.
509 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
510 txReadFuture = new AtomicReference<>();
511 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
512 final CountDownLatch txReadDone = new CountDownLatch(1);
513 Thread txThread = new Thread() {
517 readWriteTx.write(TestModel.TEST_PATH,
518 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
520 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
523 } catch(Exception e) {
527 txReadDone.countDown();
534 // Wait for the Tx operations to complete.
536 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
537 if(caughtEx.get() != null) {
538 throw caughtEx.get();
541 assertEquals("Tx read done", true, done);
543 // Wait for the read to complete. Since the shard never initialized, the Tx should
544 // have timed out and throw an appropriate exception cause.
547 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
548 } catch(ReadFailedException e) {
551 blockRecoveryLatch.countDown();
557 private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
558 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
559 String testName = "testTransactionCommitFailureWithNoShardLeader";
560 String shardName = "default";
562 // We don't want the shard to become the leader so prevent shard election from completing
563 // by setting the election timeout, which is based on the heartbeat interval, really high.
565 datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
566 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
568 // Set the leader election timeout low for the test.
570 datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
572 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
574 // Create the write Tx.
576 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
577 dataStore.newReadWriteTransaction();
578 assertNotNull("newReadWriteTransaction returned null", writeTx);
580 // Do some modifications and ready the Tx on a separate thread.
582 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
583 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
584 final CountDownLatch txReady = new CountDownLatch(1);
585 Thread txThread = new Thread() {
589 writeTx.write(TestModel.JUNK_PATH,
590 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
592 txCohort.set(writeTx.ready());
593 } catch(Exception e) {
604 // Wait for the Tx operations to complete.
606 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
607 if(caughtEx.get() != null) {
608 throw caughtEx.get();
611 assertEquals("Tx ready", true, done);
613 // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
614 // should have timed out and throw an appropriate exception cause.
617 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
618 } catch(ExecutionException e) {
626 @Test(expected=NoShardLeaderException.class)
627 public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
628 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
629 testTransactionCommitFailureWithNoShardLeader(true);
632 @Test(expected=NoShardLeaderException.class)
633 public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
634 testTransactionCommitFailureWithNoShardLeader(false);
638 public void testTransactionAbort() throws Exception{
639 System.setProperty("shard.persistent", "true");
640 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
641 DistributedDataStore dataStore =
642 setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
644 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
645 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
647 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
649 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
651 cohort.canCommit().get(5, TimeUnit.SECONDS);
653 cohort.abort().get(5, TimeUnit.SECONDS);
655 testWriteTransaction(dataStore, TestModel.TEST_PATH,
656 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
663 public void testTransactionChainWithSingleShard() throws Exception{
664 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
665 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1");
667 // 1. Create a Tx chain and write-only Tx
669 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
671 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
672 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
674 // 2. Write some data
676 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
677 writeTx.write(TestModel.TEST_PATH, testNode);
679 // 3. Ready the Tx for commit
681 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
683 // 4. Commit the Tx on another thread that first waits for the second read Tx.
685 final CountDownLatch continueCommit1 = new CountDownLatch(1);
686 final CountDownLatch commit1Done = new CountDownLatch(1);
687 final AtomicReference<Exception> commit1Error = new AtomicReference<>();
692 continueCommit1.await();
694 } catch (Exception e) {
697 commit1Done.countDown();
702 // 5. Create a new read Tx from the chain to read and verify the data from the first
703 // Tx is visible after being readied.
705 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
706 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
707 assertEquals("isPresent", true, optional.isPresent());
708 assertEquals("Data node", testNode, optional.get());
710 // 6. Create a new RW Tx from the chain, write more data, and ready it
712 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
713 MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
714 rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
716 DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
718 // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
719 // verify it is visible.
721 readTx = txChain.newReadWriteTransaction();
722 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
723 assertEquals("isPresent", true, optional.isPresent());
724 assertEquals("Data node", outerNode, optional.get());
726 // 8. Wait for the 2 commits to complete and close the chain.
728 continueCommit1.countDown();
729 Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
731 if(commit1Error.get() != null) {
732 throw commit1Error.get();
739 // 9. Create a new read Tx from the data store and verify committed data.
741 readTx = dataStore.newReadOnlyTransaction();
742 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
743 assertEquals("isPresent", true, optional.isPresent());
744 assertEquals("Data node", outerNode, optional.get());
751 public void testTransactionChainWithMultipleShards() throws Exception{
752 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
753 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
754 "cars-1", "people-1");
756 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
758 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
759 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
761 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
762 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
764 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
765 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
767 DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
769 DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
771 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
772 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
773 readWriteTx.write(carPath, car);
775 MapEntryNode person = PeopleModel.newPersonEntry("jack");
776 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
777 readWriteTx.merge(personPath, person);
779 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
780 assertEquals("isPresent", true, optional.isPresent());
781 assertEquals("Data node", car, optional.get());
783 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
784 assertEquals("isPresent", true, optional.isPresent());
785 assertEquals("Data node", person, optional.get());
787 DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
789 writeTx = txChain.newWriteOnlyTransaction();
791 //writeTx.delete(personPath);
793 DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
801 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
803 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
804 assertEquals("isPresent", true, optional.isPresent());
805 assertEquals("Data node", car, optional.get());
807 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
808 //assertEquals("isPresent", false, optional.isPresent());
809 assertEquals("isPresent", true, optional.isPresent());
816 public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
817 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
818 DistributedDataStore dataStore = setupDistributedDataStore(
819 "testCreateChainedTransactionsInQuickSuccession", "test-1");
821 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
823 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
826 List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(nTxs);
827 for(int i = 0; i < nTxs; i++) {
828 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
830 rwTx.merge(TestModel.TEST_PATH, testNode);
832 cohorts.add(rwTx.ready());
836 for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
847 public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
848 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
849 DistributedDataStore dataStore = setupDistributedDataStore(
850 "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
852 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
854 DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
858 DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
860 Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
861 assertEquals("isPresent", false, optional.isPresent());
870 public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
871 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
872 DistributedDataStore dataStore = setupDistributedDataStore(
873 "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
875 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
877 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
878 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
880 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
882 // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
885 assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
890 public void testCreateChainedTransactionAfterClose() throws Throwable {
891 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
892 DistributedDataStore dataStore = setupDistributedDataStore(
893 "testCreateChainedTransactionAfterClose", "test-1");
895 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
899 // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
901 assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
906 public void testChangeListenerRegistration() throws Exception{
907 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
908 DistributedDataStore dataStore =
909 setupDistributedDataStore("testChangeListenerRegistration", "test-1");
911 testWriteTransaction(dataStore, TestModel.TEST_PATH,
912 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
914 MockDataChangeListener listener = new MockDataChangeListener(1);
916 ListenerRegistration<MockDataChangeListener>
917 listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
918 DataChangeScope.SUBTREE);
920 assertNotNull("registerChangeListener returned null", listenerReg);
922 // Wait for the initial notification
924 listener.waitForChangeEvents(TestModel.TEST_PATH);
930 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
931 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
933 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
934 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
935 testWriteTransaction(dataStore, listPath,
936 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
938 // Wait for the 2 updates.
940 listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
944 testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
945 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
946 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
948 listener.expectNoMoreChanges("Received unexpected change after close");