1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.fail;
6 import static org.mockito.Matchers.any;
7 import static org.mockito.Matchers.eq;
8 import static org.mockito.Mockito.timeout;
9 import static org.mockito.Mockito.verify;
10 import akka.actor.ActorSystem;
11 import akka.actor.Address;
12 import akka.actor.AddressFromURIString;
13 import akka.cluster.Cluster;
14 import akka.testkit.JavaTestKit;
15 import com.google.common.base.Optional;
16 import com.google.common.collect.ImmutableMap;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.MoreExecutors;
20 import com.google.common.util.concurrent.Uninterruptibles;
21 import com.typesafe.config.ConfigFactory;
22 import java.io.IOException;
23 import java.math.BigInteger;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.List;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicReference;
31 import org.junit.AfterClass;
32 import org.junit.BeforeClass;
33 import org.junit.Test;
34 import org.mockito.Mockito;
35 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
37 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
38 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
39 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
40 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
41 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
42 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
43 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
44 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
45 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
46 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
47 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
48 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
49 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
50 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
51 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
52 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
53 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
54 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
55 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
56 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
57 import org.opendaylight.yangtools.concepts.ListenerRegistration;
58 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
59 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
60 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
61 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
62 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
63 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
64 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
66 public class DistributedDataStoreIntegrationTest {
68 private static ActorSystem system;
70 private final DatastoreContext.Builder datastoreContextBuilder =
71 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
74 public static void setUpClass() throws IOException {
75 system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
76 Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
77 Cluster.get(system).join(member1Address);
81 public static void tearDownClass() throws IOException {
82 JavaTestKit.shutdownActorSystem(system);
86 protected ActorSystem getSystem() {
91 public void testWriteTransactionWithSingleShard() throws Exception{
92 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
93 DistributedDataStore dataStore =
94 setupDistributedDataStore("transactionIntegrationTest", "test-1");
96 testWriteTransaction(dataStore, TestModel.TEST_PATH,
97 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
99 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
100 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
107 public void testWriteTransactionWithMultipleShards() throws Exception{
108 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
109 DistributedDataStore dataStore =
110 setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
112 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
113 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
115 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
116 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
118 doCommit(writeTx.ready());
120 writeTx = dataStore.newWriteOnlyTransaction();
122 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
123 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
125 doCommit(writeTx.ready());
127 writeTx = dataStore.newWriteOnlyTransaction();
129 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
130 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
131 writeTx.write(carPath, car);
133 MapEntryNode person = PeopleModel.newPersonEntry("jack");
134 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
135 writeTx.write(personPath, person);
137 doCommit(writeTx.ready());
139 // Verify the data in the store
141 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
143 Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
144 assertEquals("isPresent", true, optional.isPresent());
145 assertEquals("Data node", car, optional.get());
147 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
148 assertEquals("isPresent", true, optional.isPresent());
149 assertEquals("Data node", person, optional.get());
156 public void testReadWriteTransactionWithSingleShard() throws Exception{
157 System.setProperty("shard.persistent", "true");
158 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
159 DistributedDataStore dataStore =
160 setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1");
162 // 1. Create a read-write Tx
164 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
165 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
167 // 2. Write some data
169 YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
170 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
171 readWriteTx.write(nodePath, nodeToWrite );
173 // 3. Read the data from Tx
175 Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
176 assertEquals("exists", true, exists);
178 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
179 assertEquals("isPresent", true, optional.isPresent());
180 assertEquals("Data node", nodeToWrite, optional.get());
182 // 4. Ready the Tx for commit
184 DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
190 // 6. Verify the data in the store
192 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
194 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
195 assertEquals("isPresent", true, optional.isPresent());
196 assertEquals("Data node", nodeToWrite, optional.get());
203 public void testReadWriteTransactionWithMultipleShards() throws Exception{
204 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
205 DistributedDataStore dataStore =
206 setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1");
208 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
209 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
211 readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
212 readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
214 doCommit(readWriteTx.ready());
216 readWriteTx = dataStore.newReadWriteTransaction();
218 readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
219 readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
221 doCommit(readWriteTx.ready());
223 readWriteTx = dataStore.newReadWriteTransaction();
225 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
226 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
227 readWriteTx.write(carPath, car);
229 MapEntryNode person = PeopleModel.newPersonEntry("jack");
230 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
231 readWriteTx.write(personPath, person);
233 Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
234 assertEquals("exists", true, exists);
236 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
237 assertEquals("isPresent", true, optional.isPresent());
238 assertEquals("Data node", car, optional.get());
240 doCommit(readWriteTx.ready());
242 // Verify the data in the store
244 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
246 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
247 assertEquals("isPresent", true, optional.isPresent());
248 assertEquals("Data node", car, optional.get());
250 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
251 assertEquals("isPresent", true, optional.isPresent());
252 assertEquals("Data node", person, optional.get());
259 public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
260 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
261 DistributedDataStore dataStore = setupDistributedDataStore(
262 "testSingleTransactionsWritesInQuickSuccession", "cars-1");
264 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
266 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
267 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
268 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
269 doCommit(writeTx.ready());
271 writeTx = txChain.newWriteOnlyTransaction();
274 for(int i = 0; i < nCars; i++) {
275 writeTx.write(CarsModel.newCarPath("car" + i),
276 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
279 doCommit(writeTx.ready());
281 Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
282 CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
283 assertEquals("isPresent", true, optional.isPresent());
284 assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
290 private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
291 final boolean writeOnly) throws Exception {
292 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
293 String shardName = "test-1";
295 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
296 // initialized until we create and submit the write the Tx.
297 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
298 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
299 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
301 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
303 // Create the write Tx
305 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
306 dataStore.newReadWriteTransaction();
307 assertNotNull("newReadWriteTransaction returned null", writeTx);
309 // Do some modification operations and ready the Tx on a separate thread.
311 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
312 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
313 TestModel.ID_QNAME, 1).build();
315 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
316 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
317 final CountDownLatch txReady = new CountDownLatch(1);
318 Thread txThread = new Thread() {
322 writeTx.write(TestModel.TEST_PATH,
323 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
325 writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
326 TestModel.OUTER_LIST_QNAME).build());
328 writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
329 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
331 writeTx.delete(listEntryPath);
333 txCohort.set(writeTx.ready());
334 } catch(Exception e) {
345 // Wait for the Tx operations to complete.
347 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
348 if(caughtEx.get() != null) {
349 throw caughtEx.get();
352 assertEquals("Tx ready", true, done);
354 // At this point the Tx operations should be waiting for the shard to initialize so
355 // trigger the latch to let the shard recovery to continue.
357 blockRecoveryLatch.countDown();
359 // Wait for the Tx commit to complete.
361 doCommit(txCohort.get());
363 // Verify the data in the store
365 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
367 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
368 get(5, TimeUnit.SECONDS);
369 assertEquals("isPresent", true, optional.isPresent());
371 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
372 assertEquals("isPresent", true, optional.isPresent());
374 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
375 assertEquals("isPresent", false, optional.isPresent());
382 public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
383 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
384 testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
388 public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
389 testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
393 public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
394 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
395 String testName = "testTransactionReadsWithShardNotInitiallyReady";
396 String shardName = "test-1";
398 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
399 // initialized until we create the Tx.
400 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
401 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
402 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
404 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
406 // Create the read-write Tx
408 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
409 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
411 // Do some reads on the Tx on a separate thread.
413 final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
414 new AtomicReference<>();
415 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
416 txReadFuture = new AtomicReference<>();
417 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
418 final CountDownLatch txReadsDone = new CountDownLatch(1);
419 Thread txThread = new Thread() {
423 readWriteTx.write(TestModel.TEST_PATH,
424 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
426 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
428 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
429 } catch(Exception e) {
433 txReadsDone.countDown();
440 // Wait for the Tx operations to complete.
442 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
443 if(caughtEx.get() != null) {
444 throw caughtEx.get();
447 assertEquals("Tx reads done", true, done);
449 // At this point the Tx operations should be waiting for the shard to initialize so
450 // trigger the latch to let the shard recovery to continue.
452 blockRecoveryLatch.countDown();
454 // Wait for the reads to complete and verify.
456 assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
457 assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
465 @Test(expected=NotInitializedException.class)
466 public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
467 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
468 String testName = "testTransactionCommitFailureWithShardNotInitialized";
469 String shardName = "test-1";
471 // Set the shard initialization timeout low for the test.
473 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
475 // Setup the InMemoryJournal to block shard recovery indefinitely.
477 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
478 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
479 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
481 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
483 // Create the write Tx
485 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
486 assertNotNull("newReadWriteTransaction returned null", writeTx);
488 // Do some modifications and ready the Tx on a separate thread.
490 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
491 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
492 final CountDownLatch txReady = new CountDownLatch(1);
493 Thread txThread = new Thread() {
497 writeTx.write(TestModel.TEST_PATH,
498 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
500 txCohort.set(writeTx.ready());
501 } catch(Exception e) {
512 // Wait for the Tx operations to complete.
514 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
515 if(caughtEx.get() != null) {
516 throw caughtEx.get();
519 assertEquals("Tx ready", true, done);
521 // Wait for the commit to complete. Since the shard never initialized, the Tx should
522 // have timed out and throw an appropriate exception cause.
525 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
526 } catch(ExecutionException e) {
529 blockRecoveryLatch.countDown();
535 @Test(expected=NotInitializedException.class)
536 public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
537 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
538 String testName = "testTransactionReadFailureWithShardNotInitialized";
539 String shardName = "test-1";
541 // Set the shard initialization timeout low for the test.
543 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
545 // Setup the InMemoryJournal to block shard recovery indefinitely.
547 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
548 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
549 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
551 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
553 // Create the read-write Tx
555 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
556 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
558 // Do a read on the Tx on a separate thread.
560 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
561 txReadFuture = new AtomicReference<>();
562 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
563 final CountDownLatch txReadDone = new CountDownLatch(1);
564 Thread txThread = new Thread() {
568 readWriteTx.write(TestModel.TEST_PATH,
569 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
571 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
574 } catch(Exception e) {
578 txReadDone.countDown();
585 // Wait for the Tx operations to complete.
587 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
588 if(caughtEx.get() != null) {
589 throw caughtEx.get();
592 assertEquals("Tx read done", true, done);
594 // Wait for the read to complete. Since the shard never initialized, the Tx should
595 // have timed out and throw an appropriate exception cause.
598 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
599 } catch(ReadFailedException e) {
602 blockRecoveryLatch.countDown();
608 private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
609 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
610 String testName = "testTransactionCommitFailureWithNoShardLeader";
611 String shardName = "default";
613 // We don't want the shard to become the leader so prevent shard election from completing
614 // by setting the election timeout, which is based on the heartbeat interval, really high.
616 datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
617 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
619 // Set the leader election timeout low for the test.
621 datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
623 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
625 // Create the write Tx.
627 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
628 dataStore.newReadWriteTransaction();
629 assertNotNull("newReadWriteTransaction returned null", writeTx);
631 // Do some modifications and ready the Tx on a separate thread.
633 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
634 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
635 final CountDownLatch txReady = new CountDownLatch(1);
636 Thread txThread = new Thread() {
640 writeTx.write(TestModel.JUNK_PATH,
641 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
643 txCohort.set(writeTx.ready());
644 } catch(Exception e) {
655 // Wait for the Tx operations to complete.
657 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
658 if(caughtEx.get() != null) {
659 throw caughtEx.get();
662 assertEquals("Tx ready", true, done);
664 // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
665 // should have timed out and throw an appropriate exception cause.
668 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
669 } catch(ExecutionException e) {
677 @Test(expected=NoShardLeaderException.class)
678 public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
679 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
680 testTransactionCommitFailureWithNoShardLeader(true);
683 @Test(expected=NoShardLeaderException.class)
684 public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
685 testTransactionCommitFailureWithNoShardLeader(false);
689 public void testTransactionAbort() throws Exception{
690 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
691 DistributedDataStore dataStore =
692 setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
694 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
695 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
697 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
699 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
701 cohort.canCommit().get(5, TimeUnit.SECONDS);
703 cohort.abort().get(5, TimeUnit.SECONDS);
705 testWriteTransaction(dataStore, TestModel.TEST_PATH,
706 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
713 public void testTransactionChainWithSingleShard() throws Exception{
714 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
715 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1");
717 // 1. Create a Tx chain and write-only Tx
719 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
721 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
722 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
724 // 2. Write some data
726 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
727 writeTx.write(TestModel.TEST_PATH, testNode);
729 // 3. Ready the Tx for commit
731 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
733 // 4. Commit the Tx on another thread that first waits for the second read Tx.
735 final CountDownLatch continueCommit1 = new CountDownLatch(1);
736 final CountDownLatch commit1Done = new CountDownLatch(1);
737 final AtomicReference<Exception> commit1Error = new AtomicReference<>();
742 continueCommit1.await();
744 } catch (Exception e) {
747 commit1Done.countDown();
752 // 5. Create a new read Tx from the chain to read and verify the data from the first
753 // Tx is visible after being readied.
755 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
756 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
757 assertEquals("isPresent", true, optional.isPresent());
758 assertEquals("Data node", testNode, optional.get());
760 // 6. Create a new RW Tx from the chain, write more data, and ready it
762 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
763 MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
764 rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
766 DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
768 // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
769 // verify it is visible.
771 readTx = txChain.newReadWriteTransaction();
772 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
773 assertEquals("isPresent", true, optional.isPresent());
774 assertEquals("Data node", outerNode, optional.get());
776 // 8. Wait for the 2 commits to complete and close the chain.
778 continueCommit1.countDown();
779 Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
781 if(commit1Error.get() != null) {
782 throw commit1Error.get();
789 // 9. Create a new read Tx from the data store and verify committed data.
791 readTx = dataStore.newReadOnlyTransaction();
792 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
793 assertEquals("isPresent", true, optional.isPresent());
794 assertEquals("Data node", outerNode, optional.get());
801 public void testTransactionChainWithMultipleShards() throws Exception{
802 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
803 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
804 "cars-1", "people-1");
806 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
808 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
809 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
811 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
812 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
814 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
815 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
817 DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
819 DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
821 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
822 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
823 readWriteTx.write(carPath, car);
825 MapEntryNode person = PeopleModel.newPersonEntry("jack");
826 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
827 readWriteTx.merge(personPath, person);
829 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
830 assertEquals("isPresent", true, optional.isPresent());
831 assertEquals("Data node", car, optional.get());
833 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
834 assertEquals("isPresent", true, optional.isPresent());
835 assertEquals("Data node", person, optional.get());
837 DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
839 writeTx = txChain.newWriteOnlyTransaction();
841 writeTx.delete(carPath);
843 DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
845 ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
846 ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
848 doCommit(canCommit1, cohort1);
849 doCommit(canCommit2, cohort2);
854 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
856 optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
857 assertEquals("isPresent", false, optional.isPresent());
859 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
860 assertEquals("isPresent", true, optional.isPresent());
861 assertEquals("Data node", person, optional.get());
868 public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
869 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
870 DistributedDataStore dataStore = setupDistributedDataStore(
871 "testCreateChainedTransactionsInQuickSuccession", "cars-1");
873 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
874 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
875 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
877 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
878 DOMTransactionChain txChain = broker.createTransactionChain(listener);
880 List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
882 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
883 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
884 writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
885 futures.add(writeTx.submit());
888 for(int i = 0; i < nCars; i++) {
889 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
891 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
892 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
894 futures.add(rwTx.submit());
897 for(CheckedFuture<Void, TransactionCommitFailedException> f: futures) {
901 Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
902 LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
903 assertEquals("isPresent", true, optional.isPresent());
904 assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
915 public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
916 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
917 DistributedDataStore dataStore = setupDistributedDataStore(
918 "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
920 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
922 DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
926 DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
928 Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
929 assertEquals("isPresent", false, optional.isPresent());
938 public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
939 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
940 DistributedDataStore dataStore = setupDistributedDataStore(
941 "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
943 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
945 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
946 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
948 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
950 // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
953 assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
958 public void testCreateChainedTransactionAfterClose() throws Throwable {
959 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
960 DistributedDataStore dataStore = setupDistributedDataStore(
961 "testCreateChainedTransactionAfterClose", "test-1");
963 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
967 // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
969 assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
974 public void testChainedTransactionFailureWithSingleShard() throws Exception{
975 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
976 DistributedDataStore dataStore = setupDistributedDataStore(
977 "testChainedTransactionFailureWithSingleShard", "cars-1");
979 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
980 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
981 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
983 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
984 DOMTransactionChain txChain = broker.createTransactionChain(listener);
986 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
988 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
989 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
990 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
992 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
995 rwTx.submit().checkedGet(5, TimeUnit.SECONDS);
996 fail("Expected TransactionCommitFailedException");
997 } catch (TransactionCommitFailedException e) {
1001 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class));
1010 public void testChainedTransactionFailureWithMultipleShards() throws Exception{
1011 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1012 DistributedDataStore dataStore = setupDistributedDataStore(
1013 "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1");
1015 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1016 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
1017 LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
1019 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1020 DOMTransactionChain txChain = broker.createTransactionChain(listener);
1022 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1024 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
1026 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1027 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1028 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1030 // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
1031 // done for put for performance reasons.
1032 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1035 writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1036 fail("Expected TransactionCommitFailedException");
1037 } catch (TransactionCommitFailedException e) {
1041 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
1050 public void testChangeListenerRegistration() throws Exception{
1051 new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1052 DistributedDataStore dataStore =
1053 setupDistributedDataStore("testChangeListenerRegistration", "test-1");
1055 testWriteTransaction(dataStore, TestModel.TEST_PATH,
1056 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1058 MockDataChangeListener listener = new MockDataChangeListener(1);
1060 ListenerRegistration<MockDataChangeListener>
1061 listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
1062 DataChangeScope.SUBTREE);
1064 assertNotNull("registerChangeListener returned null", listenerReg);
1066 // Wait for the initial notification
1068 listener.waitForChangeEvents(TestModel.TEST_PATH);
1074 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1075 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1077 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1078 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1079 testWriteTransaction(dataStore, listPath,
1080 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1082 // Wait for the 2 updates.
1084 listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1086 listenerReg.close();
1088 testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1089 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1090 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1092 listener.expectNoMoreChanges("Received unexpected change after close");