1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import akka.actor.ActorSystem;
6 import akka.actor.Address;
7 import akka.actor.AddressFromURIString;
8 import akka.cluster.Cluster;
9 import akka.testkit.JavaTestKit;
10 import com.google.common.base.Optional;
11 import com.google.common.collect.ImmutableMap;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.Uninterruptibles;
16 import com.typesafe.config.ConfigFactory;
17 import java.io.IOException;
18 import java.math.BigInteger;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicReference;
26 import org.junit.AfterClass;
27 import org.junit.BeforeClass;
28 import org.junit.Test;
29 import org.mockito.Mockito;
30 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
31 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
32 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
33 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
34 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
35 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
36 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
37 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
38 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
39 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
40 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
41 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
42 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
43 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
44 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
45 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
46 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
47 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
48 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
49 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
50 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
51 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
52 import org.opendaylight.yangtools.concepts.ListenerRegistration;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
54 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
55 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
56 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
57 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
59 public class DistributedDataStoreIntegrationTest {
61 private static ActorSystem system;
63 private final DatastoreContext.Builder datastoreContextBuilder =
64 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
67 public static void setUpClass() throws IOException {
68 system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
69 Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
70 Cluster.get(system).join(member1Address);
74 public static void tearDownClass() throws IOException {
75 JavaTestKit.shutdownActorSystem(system);
79 protected ActorSystem getSystem() {
84 public void testWriteTransactionWithSingleShard() throws Exception{
85 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
86 DistributedDataStore dataStore =
87 setupDistributedDataStore("transactionIntegrationTest", "test-1");
89 testWriteTransaction(dataStore, TestModel.TEST_PATH,
90 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
92 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
93 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
100 public void testWriteTransactionWithMultipleShards() throws Exception{
101 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
102 DistributedDataStore dataStore =
103 setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
105 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
106 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
108 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
109 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
111 doCommit(writeTx.ready());
113 writeTx = dataStore.newWriteOnlyTransaction();
115 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
116 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
118 doCommit(writeTx.ready());
120 writeTx = dataStore.newWriteOnlyTransaction();
122 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
123 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
124 writeTx.write(carPath, car);
126 MapEntryNode person = PeopleModel.newPersonEntry("jack");
127 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
128 writeTx.write(personPath, person);
130 doCommit(writeTx.ready());
132 // Verify the data in the store
134 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
136 Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
137 assertEquals("isPresent", true, optional.isPresent());
138 assertEquals("Data node", car, optional.get());
140 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
141 assertEquals("isPresent", true, optional.isPresent());
142 assertEquals("Data node", person, optional.get());
149 public void testReadWriteTransactionWithSingleShard() throws Exception{
150 System.setProperty("shard.persistent", "true");
151 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
152 DistributedDataStore dataStore =
153 setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1");
155 // 1. Create a read-write Tx
157 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
158 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
160 // 2. Write some data
162 YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
163 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
164 readWriteTx.write(nodePath, nodeToWrite );
166 // 3. Read the data from Tx
168 Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
169 assertEquals("exists", true, exists);
171 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
172 assertEquals("isPresent", true, optional.isPresent());
173 assertEquals("Data node", nodeToWrite, optional.get());
175 // 4. Ready the Tx for commit
177 DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
183 // 6. Verify the data in the store
185 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
187 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
188 assertEquals("isPresent", true, optional.isPresent());
189 assertEquals("Data node", nodeToWrite, optional.get());
196 public void testReadWriteTransactionWithMultipleShards() throws Exception{
197 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
198 DistributedDataStore dataStore =
199 setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1");
201 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
202 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
204 readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
205 readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
207 doCommit(readWriteTx.ready());
209 readWriteTx = dataStore.newReadWriteTransaction();
211 readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
212 readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
214 doCommit(readWriteTx.ready());
216 readWriteTx = dataStore.newReadWriteTransaction();
218 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
219 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
220 readWriteTx.write(carPath, car);
222 MapEntryNode person = PeopleModel.newPersonEntry("jack");
223 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
224 readWriteTx.write(personPath, person);
226 Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
227 assertEquals("exists", true, exists);
229 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
230 assertEquals("isPresent", true, optional.isPresent());
231 assertEquals("Data node", car, optional.get());
233 doCommit(readWriteTx.ready());
235 // Verify the data in the store
237 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
239 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
240 assertEquals("isPresent", true, optional.isPresent());
241 assertEquals("Data node", car, optional.get());
243 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
244 assertEquals("isPresent", true, optional.isPresent());
245 assertEquals("Data node", person, optional.get());
252 public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
253 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
254 DistributedDataStore dataStore = setupDistributedDataStore(
255 "testSingleTransactionsWritesInQuickSuccession", "cars-1");
257 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
259 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
260 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
261 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
262 doCommit(writeTx.ready());
264 writeTx = txChain.newWriteOnlyTransaction();
267 for(int i = 0; i < nCars; i++) {
268 writeTx.write(CarsModel.newCarPath("car" + i),
269 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
272 doCommit(writeTx.ready());
274 Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
275 CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
276 assertEquals("isPresent", true, optional.isPresent());
277 assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
283 private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
284 final boolean writeOnly) throws Exception {
285 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
286 String shardName = "test-1";
288 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
289 // initialized until we create and submit the write the Tx.
290 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
291 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
292 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
294 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
296 // Create the write Tx
298 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
299 dataStore.newReadWriteTransaction();
300 assertNotNull("newReadWriteTransaction returned null", writeTx);
302 // Do some modification operations and ready the Tx on a separate thread.
304 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
305 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
306 TestModel.ID_QNAME, 1).build();
308 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
309 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
310 final CountDownLatch txReady = new CountDownLatch(1);
311 Thread txThread = new Thread() {
315 writeTx.write(TestModel.TEST_PATH,
316 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
318 writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
319 TestModel.OUTER_LIST_QNAME).build());
321 writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
322 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
324 writeTx.delete(listEntryPath);
326 txCohort.set(writeTx.ready());
327 } catch(Exception e) {
338 // Wait for the Tx operations to complete.
340 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
341 if(caughtEx.get() != null) {
342 throw caughtEx.get();
345 assertEquals("Tx ready", true, done);
347 // At this point the Tx operations should be waiting for the shard to initialize so
348 // trigger the latch to let the shard recovery to continue.
350 blockRecoveryLatch.countDown();
352 // Wait for the Tx commit to complete.
354 doCommit(txCohort.get());
356 // Verify the data in the store
358 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
360 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
361 get(5, TimeUnit.SECONDS);
362 assertEquals("isPresent", true, optional.isPresent());
364 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
365 assertEquals("isPresent", true, optional.isPresent());
367 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
368 assertEquals("isPresent", false, optional.isPresent());
375 public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
376 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
377 testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
381 public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
382 testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
386 public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
387 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
388 String testName = "testTransactionReadsWithShardNotInitiallyReady";
389 String shardName = "test-1";
391 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
392 // initialized until we create the Tx.
393 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
394 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
395 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
397 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
399 // Create the read-write Tx
401 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
402 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
404 // Do some reads on the Tx on a separate thread.
406 final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
407 new AtomicReference<>();
408 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
409 txReadFuture = new AtomicReference<>();
410 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
411 final CountDownLatch txReadsDone = new CountDownLatch(1);
412 Thread txThread = new Thread() {
416 readWriteTx.write(TestModel.TEST_PATH,
417 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
419 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
421 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
422 } catch(Exception e) {
426 txReadsDone.countDown();
433 // Wait for the Tx operations to complete.
435 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
436 if(caughtEx.get() != null) {
437 throw caughtEx.get();
440 assertEquals("Tx reads done", true, done);
442 // At this point the Tx operations should be waiting for the shard to initialize so
443 // trigger the latch to let the shard recovery to continue.
445 blockRecoveryLatch.countDown();
447 // Wait for the reads to complete and verify.
449 assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
450 assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
458 @Test(expected=NotInitializedException.class)
459 public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
460 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
461 String testName = "testTransactionCommitFailureWithShardNotInitialized";
462 String shardName = "test-1";
464 // Set the shard initialization timeout low for the test.
466 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
468 // Setup the InMemoryJournal to block shard recovery indefinitely.
470 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
471 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
472 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
474 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
476 // Create the write Tx
478 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
479 assertNotNull("newReadWriteTransaction returned null", writeTx);
481 // Do some modifications and ready the Tx on a separate thread.
483 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
484 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
485 final CountDownLatch txReady = new CountDownLatch(1);
486 Thread txThread = new Thread() {
490 writeTx.write(TestModel.TEST_PATH,
491 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
493 txCohort.set(writeTx.ready());
494 } catch(Exception e) {
505 // Wait for the Tx operations to complete.
507 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
508 if(caughtEx.get() != null) {
509 throw caughtEx.get();
512 assertEquals("Tx ready", true, done);
514 // Wait for the commit to complete. Since the shard never initialized, the Tx should
515 // have timed out and throw an appropriate exception cause.
518 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
519 } catch(ExecutionException e) {
522 blockRecoveryLatch.countDown();
528 @Test(expected=NotInitializedException.class)
529 public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
530 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
531 String testName = "testTransactionReadFailureWithShardNotInitialized";
532 String shardName = "test-1";
534 // Set the shard initialization timeout low for the test.
536 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
538 // Setup the InMemoryJournal to block shard recovery indefinitely.
540 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
541 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
542 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
544 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
546 // Create the read-write Tx
548 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
549 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
551 // Do a read on the Tx on a separate thread.
553 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
554 txReadFuture = new AtomicReference<>();
555 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
556 final CountDownLatch txReadDone = new CountDownLatch(1);
557 Thread txThread = new Thread() {
561 readWriteTx.write(TestModel.TEST_PATH,
562 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
564 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
567 } catch(Exception e) {
571 txReadDone.countDown();
578 // Wait for the Tx operations to complete.
580 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
581 if(caughtEx.get() != null) {
582 throw caughtEx.get();
585 assertEquals("Tx read done", true, done);
587 // Wait for the read to complete. Since the shard never initialized, the Tx should
588 // have timed out and throw an appropriate exception cause.
591 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
592 } catch(ReadFailedException e) {
595 blockRecoveryLatch.countDown();
601 private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
602 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
603 String testName = "testTransactionCommitFailureWithNoShardLeader";
604 String shardName = "default";
606 // We don't want the shard to become the leader so prevent shard election from completing
607 // by setting the election timeout, which is based on the heartbeat interval, really high.
609 datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
610 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
612 // Set the leader election timeout low for the test.
614 datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
616 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
618 // Create the write Tx.
620 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
621 dataStore.newReadWriteTransaction();
622 assertNotNull("newReadWriteTransaction returned null", writeTx);
624 // Do some modifications and ready the Tx on a separate thread.
626 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
627 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
628 final CountDownLatch txReady = new CountDownLatch(1);
629 Thread txThread = new Thread() {
633 writeTx.write(TestModel.JUNK_PATH,
634 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
636 txCohort.set(writeTx.ready());
637 } catch(Exception e) {
648 // Wait for the Tx operations to complete.
650 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
651 if(caughtEx.get() != null) {
652 throw caughtEx.get();
655 assertEquals("Tx ready", true, done);
657 // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
658 // should have timed out and throw an appropriate exception cause.
661 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
662 } catch(ExecutionException e) {
670 @Test(expected=NoShardLeaderException.class)
671 public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
672 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
673 testTransactionCommitFailureWithNoShardLeader(true);
676 @Test(expected=NoShardLeaderException.class)
677 public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
678 testTransactionCommitFailureWithNoShardLeader(false);
682 public void testTransactionAbort() throws Exception{
683 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
684 DistributedDataStore dataStore =
685 setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
687 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
688 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
690 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
692 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
694 cohort.canCommit().get(5, TimeUnit.SECONDS);
696 cohort.abort().get(5, TimeUnit.SECONDS);
698 testWriteTransaction(dataStore, TestModel.TEST_PATH,
699 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
706 public void testTransactionChainWithSingleShard() throws Exception{
707 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
708 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1");
710 // 1. Create a Tx chain and write-only Tx
712 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
714 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
715 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
717 // 2. Write some data
719 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
720 writeTx.write(TestModel.TEST_PATH, testNode);
722 // 3. Ready the Tx for commit
724 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
726 // 4. Commit the Tx on another thread that first waits for the second read Tx.
728 final CountDownLatch continueCommit1 = new CountDownLatch(1);
729 final CountDownLatch commit1Done = new CountDownLatch(1);
730 final AtomicReference<Exception> commit1Error = new AtomicReference<>();
735 continueCommit1.await();
737 } catch (Exception e) {
740 commit1Done.countDown();
745 // 5. Create a new read Tx from the chain to read and verify the data from the first
746 // Tx is visible after being readied.
748 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
749 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
750 assertEquals("isPresent", true, optional.isPresent());
751 assertEquals("Data node", testNode, optional.get());
753 // 6. Create a new RW Tx from the chain, write more data, and ready it
755 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
756 MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
757 rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
759 DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
761 // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
762 // verify it is visible.
764 readTx = txChain.newReadWriteTransaction();
765 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
766 assertEquals("isPresent", true, optional.isPresent());
767 assertEquals("Data node", outerNode, optional.get());
769 // 8. Wait for the 2 commits to complete and close the chain.
771 continueCommit1.countDown();
772 Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
774 if(commit1Error.get() != null) {
775 throw commit1Error.get();
782 // 9. Create a new read Tx from the data store and verify committed data.
784 readTx = dataStore.newReadOnlyTransaction();
785 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
786 assertEquals("isPresent", true, optional.isPresent());
787 assertEquals("Data node", outerNode, optional.get());
794 public void testTransactionChainWithMultipleShards() throws Exception{
795 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
796 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
797 "cars-1", "people-1");
799 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
801 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
802 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
804 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
805 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
807 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
808 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
810 DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
812 DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
814 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
815 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
816 readWriteTx.write(carPath, car);
818 MapEntryNode person = PeopleModel.newPersonEntry("jack");
819 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
820 readWriteTx.merge(personPath, person);
822 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
823 assertEquals("isPresent", true, optional.isPresent());
824 assertEquals("Data node", car, optional.get());
826 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
827 assertEquals("isPresent", true, optional.isPresent());
828 assertEquals("Data node", person, optional.get());
830 DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
832 writeTx = txChain.newWriteOnlyTransaction();
834 writeTx.delete(carPath);
836 DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
838 ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
839 ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
841 doCommit(canCommit1, cohort1);
842 doCommit(canCommit2, cohort2);
847 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
849 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
850 assertEquals("isPresent", false, optional.isPresent());
852 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
853 assertEquals("isPresent", true, optional.isPresent());
854 assertEquals("Data node", person, optional.get());
861 public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
862 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
863 DistributedDataStore dataStore = setupDistributedDataStore(
864 "testCreateChainedTransactionsInQuickSuccession", "cars-1");
866 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
867 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
868 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
870 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
871 DOMTransactionChain txChain = broker.createTransactionChain(listener);
873 List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
875 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
876 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
877 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
878 futures.add(writeTx.submit());
881 for(int i = 0; i < nCars; i++) {
882 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
884 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
885 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
887 futures.add(rwTx.submit());
890 for(CheckedFuture<Void, TransactionCommitFailedException> f: futures) {
894 Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
895 LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
896 assertEquals("isPresent", true, optional.isPresent());
897 assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
908 public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
909 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
910 DistributedDataStore dataStore = setupDistributedDataStore(
911 "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
913 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
915 DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
919 DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
921 Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
922 assertEquals("isPresent", false, optional.isPresent());
931 public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
932 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
933 DistributedDataStore dataStore = setupDistributedDataStore(
934 "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
936 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
938 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
939 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
941 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
943 // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
946 assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
951 public void testCreateChainedTransactionAfterClose() throws Throwable {
952 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
953 DistributedDataStore dataStore = setupDistributedDataStore(
954 "testCreateChainedTransactionAfterClose", "test-1");
956 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
960 // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
962 assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
967 public void testChangeListenerRegistration() throws Exception{
968 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
969 DistributedDataStore dataStore =
970 setupDistributedDataStore("testChangeListenerRegistration", "test-1");
972 testWriteTransaction(dataStore, TestModel.TEST_PATH,
973 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
975 MockDataChangeListener listener = new MockDataChangeListener(1);
977 ListenerRegistration<MockDataChangeListener>
978 listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
979 DataChangeScope.SUBTREE);
981 assertNotNull("registerChangeListener returned null", listenerReg);
983 // Wait for the initial notification
985 listener.waitForChangeEvents(TestModel.TEST_PATH);
991 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
992 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
994 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
995 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
996 testWriteTransaction(dataStore, listPath,
997 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
999 // Wait for the 2 updates.
1001 listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1003 listenerReg.close();
1005 testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1006 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1007 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1009 listener.expectNoMoreChanges("Received unexpected change after close");