1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import akka.actor.ActorSystem;
6 import akka.actor.Address;
7 import akka.actor.AddressFromURIString;
8 import akka.cluster.Cluster;
9 import akka.testkit.JavaTestKit;
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.typesafe.config.ConfigFactory;
15 import java.io.IOException;
16 import java.math.BigInteger;
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.atomic.AtomicReference;
23 import org.junit.AfterClass;
24 import org.junit.BeforeClass;
25 import org.junit.Test;
26 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
27 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
28 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
29 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
30 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
31 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
32 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
33 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
34 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
41 import org.opendaylight.yangtools.concepts.ListenerRegistration;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
44 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
45 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
46 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
48 public class DistributedDataStoreIntegrationTest {
50 private static ActorSystem system;
52 private final DatastoreContext.Builder datastoreContextBuilder =
53 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
56 public static void setUpClass() throws IOException {
57 system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
58 Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
59 Cluster.get(system).join(member1Address);
63 public static void tearDownClass() throws IOException {
64 JavaTestKit.shutdownActorSystem(system);
68 protected ActorSystem getSystem() {
73 public void testWriteTransactionWithSingleShard() throws Exception{
74 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
75 DistributedDataStore dataStore =
76 setupDistributedDataStore("transactionIntegrationTest", "test-1");
78 testWriteTransaction(dataStore, TestModel.TEST_PATH,
79 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
81 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
82 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
89 public void testWriteTransactionWithMultipleShards() throws Exception{
90 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
91 DistributedDataStore dataStore =
92 setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
94 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
95 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
97 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
98 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
100 doCommit(writeTx.ready());
102 writeTx = dataStore.newWriteOnlyTransaction();
104 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
105 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
107 doCommit(writeTx.ready());
109 writeTx = dataStore.newWriteOnlyTransaction();
111 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
112 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
113 writeTx.write(carPath, car);
115 MapEntryNode person = PeopleModel.newPersonEntry("jack");
116 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
117 writeTx.write(personPath, person);
119 doCommit(writeTx.ready());
121 // Verify the data in the store
123 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
125 Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
126 assertEquals("isPresent", true, optional.isPresent());
127 assertEquals("Data node", car, optional.get());
129 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
130 assertEquals("isPresent", true, optional.isPresent());
131 assertEquals("Data node", person, optional.get());
138 public void testReadWriteTransactionWithSingleShard() throws Exception{
139 System.setProperty("shard.persistent", "true");
140 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
141 DistributedDataStore dataStore =
142 setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1");
144 // 1. Create a read-write Tx
146 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
147 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
149 // 2. Write some data
151 YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
152 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
153 readWriteTx.write(nodePath, nodeToWrite );
155 // 3. Read the data from Tx
157 Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
158 assertEquals("exists", true, exists);
160 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
161 assertEquals("isPresent", true, optional.isPresent());
162 assertEquals("Data node", nodeToWrite, optional.get());
164 // 4. Ready the Tx for commit
166 DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
172 // 6. Verify the data in the store
174 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
176 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
177 assertEquals("isPresent", true, optional.isPresent());
178 assertEquals("Data node", nodeToWrite, optional.get());
185 public void testReadWriteTransactionWithMultipleShards() throws Exception{
186 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
187 DistributedDataStore dataStore =
188 setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1");
190 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
191 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
193 readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
194 readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
196 doCommit(readWriteTx.ready());
198 readWriteTx = dataStore.newReadWriteTransaction();
200 readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
201 readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
203 doCommit(readWriteTx.ready());
205 readWriteTx = dataStore.newReadWriteTransaction();
207 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
208 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
209 readWriteTx.write(carPath, car);
211 MapEntryNode person = PeopleModel.newPersonEntry("jack");
212 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
213 readWriteTx.write(personPath, person);
215 Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
216 assertEquals("exists", true, exists);
218 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
219 assertEquals("isPresent", true, optional.isPresent());
220 assertEquals("Data node", car, optional.get());
222 doCommit(readWriteTx.ready());
224 // Verify the data in the store
226 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
228 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
229 assertEquals("isPresent", true, optional.isPresent());
230 assertEquals("Data node", car, optional.get());
232 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
233 assertEquals("isPresent", true, optional.isPresent());
234 assertEquals("Data node", person, optional.get());
240 private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
241 final boolean writeOnly) throws Exception {
242 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
243 String shardName = "test-1";
245 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
246 // initialized until we create and submit the write the Tx.
247 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
248 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
249 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
251 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
253 // Create the write Tx
255 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
256 dataStore.newReadWriteTransaction();
257 assertNotNull("newReadWriteTransaction returned null", writeTx);
259 // Do some modification operations and ready the Tx on a separate thread.
261 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
262 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
263 TestModel.ID_QNAME, 1).build();
265 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
266 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
267 final CountDownLatch txReady = new CountDownLatch(1);
268 Thread txThread = new Thread() {
272 writeTx.write(TestModel.TEST_PATH,
273 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
275 writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
276 TestModel.OUTER_LIST_QNAME).build());
278 writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
279 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
281 writeTx.delete(listEntryPath);
283 txCohort.set(writeTx.ready());
284 } catch(Exception e) {
295 // Wait for the Tx operations to complete.
297 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
298 if(caughtEx.get() != null) {
299 throw caughtEx.get();
302 assertEquals("Tx ready", true, done);
304 // At this point the Tx operations should be waiting for the shard to initialize so
305 // trigger the latch to let the shard recovery to continue.
307 blockRecoveryLatch.countDown();
309 // Wait for the Tx commit to complete.
311 doCommit(txCohort.get());
313 // Verify the data in the store
315 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
317 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
318 get(5, TimeUnit.SECONDS);
319 assertEquals("isPresent", true, optional.isPresent());
321 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
322 assertEquals("isPresent", true, optional.isPresent());
324 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
325 assertEquals("isPresent", false, optional.isPresent());
332 public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
333 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
334 testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
338 public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
339 testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
343 public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
344 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
345 String testName = "testTransactionReadsWithShardNotInitiallyReady";
346 String shardName = "test-1";
348 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
349 // initialized until we create the Tx.
350 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
351 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
352 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
354 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
356 // Create the read-write Tx
358 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
359 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
361 // Do some reads on the Tx on a separate thread.
363 final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
364 new AtomicReference<>();
365 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
366 txReadFuture = new AtomicReference<>();
367 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
368 final CountDownLatch txReadsDone = new CountDownLatch(1);
369 Thread txThread = new Thread() {
373 readWriteTx.write(TestModel.TEST_PATH,
374 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
376 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
378 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
379 } catch(Exception e) {
383 txReadsDone.countDown();
390 // Wait for the Tx operations to complete.
392 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
393 if(caughtEx.get() != null) {
394 throw caughtEx.get();
397 assertEquals("Tx reads done", true, done);
399 // At this point the Tx operations should be waiting for the shard to initialize so
400 // trigger the latch to let the shard recovery to continue.
402 blockRecoveryLatch.countDown();
404 // Wait for the reads to complete and verify.
406 assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
407 assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
415 @Test(expected=NotInitializedException.class)
416 public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
417 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
418 String testName = "testTransactionCommitFailureWithShardNotInitialized";
419 String shardName = "test-1";
421 // Set the shard initialization timeout low for the test.
423 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
425 // Setup the InMemoryJournal to block shard recovery indefinitely.
427 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
428 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
429 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
431 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
433 // Create the write Tx
435 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
436 assertNotNull("newReadWriteTransaction returned null", writeTx);
438 // Do some modifications and ready the Tx on a separate thread.
440 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
441 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
442 final CountDownLatch txReady = new CountDownLatch(1);
443 Thread txThread = new Thread() {
447 writeTx.write(TestModel.TEST_PATH,
448 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
450 txCohort.set(writeTx.ready());
451 } catch(Exception e) {
462 // Wait for the Tx operations to complete.
464 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
465 if(caughtEx.get() != null) {
466 throw caughtEx.get();
469 assertEquals("Tx ready", true, done);
471 // Wait for the commit to complete. Since the shard never initialized, the Tx should
472 // have timed out and throw an appropriate exception cause.
475 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
476 } catch(ExecutionException e) {
479 blockRecoveryLatch.countDown();
485 @Test(expected=NotInitializedException.class)
486 public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
487 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
488 String testName = "testTransactionReadFailureWithShardNotInitialized";
489 String shardName = "test-1";
491 // Set the shard initialization timeout low for the test.
493 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
495 // Setup the InMemoryJournal to block shard recovery indefinitely.
497 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
498 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
499 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
501 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
503 // Create the read-write Tx
505 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
506 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
508 // Do a read on the Tx on a separate thread.
510 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
511 txReadFuture = new AtomicReference<>();
512 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
513 final CountDownLatch txReadDone = new CountDownLatch(1);
514 Thread txThread = new Thread() {
518 readWriteTx.write(TestModel.TEST_PATH,
519 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
521 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
524 } catch(Exception e) {
528 txReadDone.countDown();
535 // Wait for the Tx operations to complete.
537 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
538 if(caughtEx.get() != null) {
539 throw caughtEx.get();
542 assertEquals("Tx read done", true, done);
544 // Wait for the read to complete. Since the shard never initialized, the Tx should
545 // have timed out and throw an appropriate exception cause.
548 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
549 } catch(ReadFailedException e) {
552 blockRecoveryLatch.countDown();
558 private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
559 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
560 String testName = "testTransactionCommitFailureWithNoShardLeader";
561 String shardName = "default";
563 // We don't want the shard to become the leader so prevent shard election from completing
564 // by setting the election timeout, which is based on the heartbeat interval, really high.
566 datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
567 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
569 // Set the leader election timeout low for the test.
571 datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
573 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
575 // Create the write Tx.
577 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
578 dataStore.newReadWriteTransaction();
579 assertNotNull("newReadWriteTransaction returned null", writeTx);
581 // Do some modifications and ready the Tx on a separate thread.
583 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
584 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
585 final CountDownLatch txReady = new CountDownLatch(1);
586 Thread txThread = new Thread() {
590 writeTx.write(TestModel.JUNK_PATH,
591 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
593 txCohort.set(writeTx.ready());
594 } catch(Exception e) {
605 // Wait for the Tx operations to complete.
607 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
608 if(caughtEx.get() != null) {
609 throw caughtEx.get();
612 assertEquals("Tx ready", true, done);
614 // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
615 // should have timed out and throw an appropriate exception cause.
618 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
619 } catch(ExecutionException e) {
627 @Test(expected=NoShardLeaderException.class)
628 public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
629 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
630 testTransactionCommitFailureWithNoShardLeader(true);
633 @Test(expected=NoShardLeaderException.class)
634 public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
635 testTransactionCommitFailureWithNoShardLeader(false);
639 public void testTransactionAbort() throws Exception{
640 System.setProperty("shard.persistent", "true");
641 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
642 DistributedDataStore dataStore =
643 setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
645 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
646 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
648 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
650 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
652 cohort.canCommit().get(5, TimeUnit.SECONDS);
654 cohort.abort().get(5, TimeUnit.SECONDS);
656 testWriteTransaction(dataStore, TestModel.TEST_PATH,
657 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
664 public void testTransactionChainWithSingleShard() throws Exception{
665 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
666 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1");
668 // 1. Create a Tx chain and write-only Tx
670 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
672 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
673 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
675 // 2. Write some data
677 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
678 writeTx.write(TestModel.TEST_PATH, testNode);
680 // 3. Ready the Tx for commit
682 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
684 // 4. Commit the Tx on another thread that first waits for the second read Tx.
686 final CountDownLatch continueCommit1 = new CountDownLatch(1);
687 final CountDownLatch commit1Done = new CountDownLatch(1);
688 final AtomicReference<Exception> commit1Error = new AtomicReference<>();
693 continueCommit1.await();
695 } catch (Exception e) {
698 commit1Done.countDown();
703 // 5. Create a new read Tx from the chain to read and verify the data from the first
704 // Tx is visible after being readied.
706 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
707 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
708 assertEquals("isPresent", true, optional.isPresent());
709 assertEquals("Data node", testNode, optional.get());
711 // 6. Create a new RW Tx from the chain, write more data, and ready it
713 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
714 MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
715 rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
717 DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
719 // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
720 // verify it is visible.
722 readTx = txChain.newReadWriteTransaction();
723 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
724 assertEquals("isPresent", true, optional.isPresent());
725 assertEquals("Data node", outerNode, optional.get());
727 // 8. Wait for the 2 commits to complete and close the chain.
729 continueCommit1.countDown();
730 Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
732 if(commit1Error.get() != null) {
733 throw commit1Error.get();
740 // 9. Create a new read Tx from the data store and verify committed data.
742 readTx = dataStore.newReadOnlyTransaction();
743 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
744 assertEquals("isPresent", true, optional.isPresent());
745 assertEquals("Data node", outerNode, optional.get());
752 public void testTransactionChainWithMultipleShards() throws Exception{
753 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
754 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
755 "cars-1", "people-1");
757 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
759 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
760 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
762 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
763 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
765 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
766 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
768 DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
770 DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
772 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
773 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
774 readWriteTx.write(carPath, car);
776 MapEntryNode person = PeopleModel.newPersonEntry("jack");
777 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
778 readWriteTx.merge(personPath, person);
780 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
781 assertEquals("isPresent", true, optional.isPresent());
782 assertEquals("Data node", car, optional.get());
784 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
785 assertEquals("isPresent", true, optional.isPresent());
786 assertEquals("Data node", person, optional.get());
788 DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
790 writeTx = txChain.newWriteOnlyTransaction();
792 writeTx.delete(carPath);
794 DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
796 ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
797 ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
799 doCommit(canCommit1, cohort1);
800 doCommit(canCommit2, cohort2);
805 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
807 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
808 assertEquals("isPresent", false, optional.isPresent());
810 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
811 assertEquals("isPresent", true, optional.isPresent());
812 assertEquals("Data node", person, optional.get());
819 public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
820 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
821 DistributedDataStore dataStore = setupDistributedDataStore(
822 "testCreateChainedTransactionsInQuickSuccession", "test-1");
824 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
826 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
829 List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(nTxs);
830 for(int i = 0; i < nTxs; i++) {
831 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
833 rwTx.merge(TestModel.TEST_PATH, testNode);
835 cohorts.add(rwTx.ready());
839 for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
850 public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
851 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
852 DistributedDataStore dataStore = setupDistributedDataStore(
853 "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
855 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
857 DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
861 DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
863 Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
864 assertEquals("isPresent", false, optional.isPresent());
873 public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
874 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
875 DistributedDataStore dataStore = setupDistributedDataStore(
876 "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
878 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
880 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
881 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
883 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
885 // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
888 assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
893 public void testCreateChainedTransactionAfterClose() throws Throwable {
894 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
895 DistributedDataStore dataStore = setupDistributedDataStore(
896 "testCreateChainedTransactionAfterClose", "test-1");
898 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
902 // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
904 assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
909 public void testChangeListenerRegistration() throws Exception{
910 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
911 DistributedDataStore dataStore =
912 setupDistributedDataStore("testChangeListenerRegistration", "test-1");
914 testWriteTransaction(dataStore, TestModel.TEST_PATH,
915 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
917 MockDataChangeListener listener = new MockDataChangeListener(1);
919 ListenerRegistration<MockDataChangeListener>
920 listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
921 DataChangeScope.SUBTREE);
923 assertNotNull("registerChangeListener returned null", listenerReg);
925 // Wait for the initial notification
927 listener.waitForChangeEvents(TestModel.TEST_PATH);
933 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
934 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
936 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
937 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
938 testWriteTransaction(dataStore, listPath,
939 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
941 // Wait for the 2 updates.
943 listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
947 testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
948 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
949 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
951 listener.expectNoMoreChanges("Received unexpected change after close");