1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.fail;
6 import akka.actor.ActorRef;
7 import akka.actor.ActorSystem;
8 import akka.actor.PoisonPill;
9 import com.google.common.base.Optional;
10 import com.google.common.util.concurrent.CheckedFuture;
11 import com.google.common.util.concurrent.Uninterruptibles;
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.CountDownLatch;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicReference;
19 import org.junit.Test;
20 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
21 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
22 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
23 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
24 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
25 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
26 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
27 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
28 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
29 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
31 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
33 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
38 import org.opendaylight.yangtools.concepts.ListenerRegistration;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
45 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
47 private final DatastoreContext.Builder datastoreContextBuilder =
48 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
51 public void testWriteTransactionWithSingleShard() throws Exception{
52 new IntegrationTestKit(getSystem()) {{
53 DistributedDataStore dataStore =
54 setupDistributedDataStore("transactionIntegrationTest", "test-1");
56 testWriteTransaction(dataStore, TestModel.TEST_PATH,
57 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
59 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
60 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
67 public void testWriteTransactionWithMultipleShards() throws Exception{
68 new IntegrationTestKit(getSystem()) {{
69 DistributedDataStore dataStore =
70 setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
72 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
73 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
75 YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH;
76 NormalizedNode<?, ?> nodeToWrite1 = CarsModel.emptyContainer();
77 writeTx.write(nodePath1, nodeToWrite1);
79 YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH;
80 NormalizedNode<?, ?> nodeToWrite2 = PeopleModel.emptyContainer();
81 writeTx.write(nodePath2, nodeToWrite2);
83 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
87 // Verify the data in the store
89 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
91 Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath1).get();
92 assertEquals("isPresent", true, optional.isPresent());
93 assertEquals("Data node", nodeToWrite1, optional.get());
95 optional = readTx.read(nodePath2).get();
96 assertEquals("isPresent", true, optional.isPresent());
97 assertEquals("Data node", nodeToWrite2, optional.get());
104 public void testReadWriteTransaction() throws Exception{
105 System.setProperty("shard.persistent", "true");
106 new IntegrationTestKit(getSystem()) {{
107 DistributedDataStore dataStore =
108 setupDistributedDataStore("testReadWriteTransaction", "test-1");
110 // 1. Create a read-write Tx
112 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
113 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
115 // 2. Write some data
117 YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
118 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
119 readWriteTx.write(nodePath, nodeToWrite );
121 // 3. Read the data from Tx
123 Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
124 assertEquals("exists", true, exists);
126 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
127 assertEquals("isPresent", true, optional.isPresent());
128 assertEquals("Data node", nodeToWrite, optional.get());
130 // 4. Ready the Tx for commit
132 DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
138 // 6. Verify the data in the store
140 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
142 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
143 assertEquals("isPresent", true, optional.isPresent());
144 assertEquals("Data node", nodeToWrite, optional.get());
150 private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
151 final boolean writeOnly) throws Exception {
152 new IntegrationTestKit(getSystem()) {{
153 String shardName = "test-1";
155 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
156 // initialized until we create and submit the write the Tx.
157 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
158 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
159 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
161 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
163 // Create the write Tx
165 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
166 dataStore.newReadWriteTransaction();
167 assertNotNull("newReadWriteTransaction returned null", writeTx);
169 // Do some modification operations and ready the Tx on a separate thread.
171 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
172 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
173 TestModel.ID_QNAME, 1).build();
175 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
176 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
177 final CountDownLatch txReady = new CountDownLatch(1);
178 Thread txThread = new Thread() {
182 writeTx.write(TestModel.TEST_PATH,
183 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
185 writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
186 TestModel.OUTER_LIST_QNAME).build());
188 writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
189 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
191 writeTx.delete(listEntryPath);
193 txCohort.set(writeTx.ready());
194 } catch(Exception e) {
205 // Wait for the Tx operations to complete.
207 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
208 if(caughtEx.get() != null) {
209 throw caughtEx.get();
212 assertEquals("Tx ready", true, done);
214 // At this point the Tx operations should be waiting for the shard to initialize so
215 // trigger the latch to let the shard recovery to continue.
217 blockRecoveryLatch.countDown();
219 // Wait for the Tx commit to complete.
221 doCommit(txCohort.get());
223 // Verify the data in the store
225 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
227 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
228 get(5, TimeUnit.SECONDS);
229 assertEquals("isPresent", true, optional.isPresent());
231 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
232 assertEquals("isPresent", true, optional.isPresent());
234 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
235 assertEquals("isPresent", false, optional.isPresent());
242 public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
243 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
244 testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
248 public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
249 testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
253 public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
254 new IntegrationTestKit(getSystem()) {{
255 String testName = "testTransactionReadsWithShardNotInitiallyReady";
256 String shardName = "test-1";
258 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
259 // initialized until we create the Tx.
260 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
261 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
262 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
264 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
266 // Create the read-write Tx
268 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
269 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
271 // Do some reads on the Tx on a separate thread.
273 final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
274 new AtomicReference<>();
275 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
276 txReadFuture = new AtomicReference<>();
277 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
278 final CountDownLatch txReadsDone = new CountDownLatch(1);
279 Thread txThread = new Thread() {
283 readWriteTx.write(TestModel.TEST_PATH,
284 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
286 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
288 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
289 } catch(Exception e) {
293 txReadsDone.countDown();
300 // Wait for the Tx operations to complete.
302 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
303 if(caughtEx.get() != null) {
304 throw caughtEx.get();
307 assertEquals("Tx reads done", true, done);
309 // At this point the Tx operations should be waiting for the shard to initialize so
310 // trigger the latch to let the shard recovery to continue.
312 blockRecoveryLatch.countDown();
314 // Wait for the reads to complete and verify.
316 assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
317 assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
325 @Test(expected=NotInitializedException.class)
326 public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
327 new IntegrationTestKit(getSystem()) {{
328 String testName = "testTransactionCommitFailureWithShardNotInitialized";
329 String shardName = "test-1";
331 // Set the shard initialization timeout low for the test.
333 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
335 // Setup the InMemoryJournal to block shard recovery indefinitely.
337 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
338 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
339 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
341 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
343 // Create the write Tx
345 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
346 assertNotNull("newReadWriteTransaction returned null", writeTx);
348 // Do some modifications and ready the Tx on a separate thread.
350 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
351 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
352 final CountDownLatch txReady = new CountDownLatch(1);
353 Thread txThread = new Thread() {
357 writeTx.write(TestModel.TEST_PATH,
358 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
360 txCohort.set(writeTx.ready());
361 } catch(Exception e) {
372 // Wait for the Tx operations to complete.
374 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
375 if(caughtEx.get() != null) {
376 throw caughtEx.get();
379 assertEquals("Tx ready", true, done);
381 // Wait for the commit to complete. Since the shard never initialized, the Tx should
382 // have timed out and throw an appropriate exception cause.
385 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
386 } catch(ExecutionException e) {
389 blockRecoveryLatch.countDown();
395 @Test(expected=NotInitializedException.class)
396 public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
397 new IntegrationTestKit(getSystem()) {{
398 String testName = "testTransactionReadFailureWithShardNotInitialized";
399 String shardName = "test-1";
401 // Set the shard initialization timeout low for the test.
403 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
405 // Setup the InMemoryJournal to block shard recovery indefinitely.
407 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
408 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
409 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
411 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
413 // Create the read-write Tx
415 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
416 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
418 // Do a read on the Tx on a separate thread.
420 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
421 txReadFuture = new AtomicReference<>();
422 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
423 final CountDownLatch txReadDone = new CountDownLatch(1);
424 Thread txThread = new Thread() {
428 readWriteTx.write(TestModel.TEST_PATH,
429 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
431 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
434 } catch(Exception e) {
438 txReadDone.countDown();
445 // Wait for the Tx operations to complete.
447 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
448 if(caughtEx.get() != null) {
449 throw caughtEx.get();
452 assertEquals("Tx read done", true, done);
454 // Wait for the read to complete. Since the shard never initialized, the Tx should
455 // have timed out and throw an appropriate exception cause.
458 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
459 } catch(ReadFailedException e) {
462 blockRecoveryLatch.countDown();
468 private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
469 new IntegrationTestKit(getSystem()) {{
470 String testName = "testTransactionCommitFailureWithNoShardLeader";
471 String shardName = "default";
473 // We don't want the shard to become the leader so prevent shard election from completing
474 // by setting the election timeout, which is based on the heartbeat interval, really high.
476 datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
477 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
479 // Set the leader election timeout low for the test.
481 datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
483 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
485 // Create the write Tx.
487 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
488 dataStore.newReadWriteTransaction();
489 assertNotNull("newReadWriteTransaction returned null", writeTx);
491 // Do some modifications and ready the Tx on a separate thread.
493 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
494 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
495 final CountDownLatch txReady = new CountDownLatch(1);
496 Thread txThread = new Thread() {
500 writeTx.write(TestModel.JUNK_PATH,
501 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
503 txCohort.set(writeTx.ready());
504 } catch(Exception e) {
515 // Wait for the Tx operations to complete.
517 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
518 if(caughtEx.get() != null) {
519 throw caughtEx.get();
522 assertEquals("Tx ready", true, done);
524 // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
525 // should have timed out and throw an appropriate exception cause.
528 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
529 } catch(ExecutionException e) {
537 @Test(expected=NoShardLeaderException.class)
538 public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
539 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
540 testTransactionCommitFailureWithNoShardLeader(true);
543 @Test(expected=NoShardLeaderException.class)
544 public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
545 testTransactionCommitFailureWithNoShardLeader(false);
549 public void testTransactionAbort() throws Exception{
550 System.setProperty("shard.persistent", "true");
551 new IntegrationTestKit(getSystem()) {{
552 DistributedDataStore dataStore =
553 setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
555 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
556 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
558 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
560 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
562 cohort.canCommit().get(5, TimeUnit.SECONDS);
564 cohort.abort().get(5, TimeUnit.SECONDS);
566 testWriteTransaction(dataStore, TestModel.TEST_PATH,
567 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
574 public void testTransactionChain() throws Exception{
575 new IntegrationTestKit(getSystem()) {{
576 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChain", "test-1");
578 // 1. Create a Tx chain and write-only Tx
580 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
582 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
583 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
585 // 2. Write some data
587 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
588 writeTx.write(TestModel.TEST_PATH, testNode);
590 // 3. Ready the Tx for commit
592 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
594 // 4. Commit the Tx on another thread that first waits for the second read Tx.
596 final CountDownLatch continueCommit1 = new CountDownLatch(1);
597 final CountDownLatch commit1Done = new CountDownLatch(1);
598 final AtomicReference<Exception> commit1Error = new AtomicReference<>();
603 continueCommit1.await();
605 } catch (Exception e) {
608 commit1Done.countDown();
613 // 5. Create a new read Tx from the chain to read and verify the data from the first
614 // Tx is visible after being readied.
616 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
617 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
618 assertEquals("isPresent", true, optional.isPresent());
619 assertEquals("Data node", testNode, optional.get());
621 // 6. Create a new RW Tx from the chain, write more data, and ready it
623 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
624 MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
625 rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
627 DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
629 // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
630 // verify it is visible.
632 readTx = txChain.newReadWriteTransaction();
633 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
634 assertEquals("isPresent", true, optional.isPresent());
635 assertEquals("Data node", outerNode, optional.get());
637 // 8. Wait for the 2 commits to complete and close the chain.
639 continueCommit1.countDown();
640 Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
642 if(commit1Error.get() != null) {
643 throw commit1Error.get();
650 // 9. Create a new read Tx from the data store and verify committed data.
652 readTx = dataStore.newReadOnlyTransaction();
653 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
654 assertEquals("isPresent", true, optional.isPresent());
655 assertEquals("Data node", outerNode, optional.get());
662 public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
663 new IntegrationTestKit(getSystem()) {{
664 DistributedDataStore dataStore = setupDistributedDataStore(
665 "testCreateChainedTransactionsInQuickSuccession", "test-1");
667 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
669 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
672 List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(nTxs);
673 for(int i = 0; i < nTxs; i++) {
674 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
676 rwTx.merge(TestModel.TEST_PATH, testNode);
678 cohorts.add(rwTx.ready());
682 for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
693 public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
694 new IntegrationTestKit(getSystem()) {{
695 DistributedDataStore dataStore = setupDistributedDataStore(
696 "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
698 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
700 DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
704 DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
706 Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
707 assertEquals("isPresent", false, optional.isPresent());
716 public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
717 new IntegrationTestKit(getSystem()) {{
718 DistributedDataStore dataStore = setupDistributedDataStore(
719 "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
721 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
723 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
724 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
726 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
728 // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
731 assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
736 public void testCreateChainedTransactionAfterClose() throws Throwable {
737 new IntegrationTestKit(getSystem()) {{
738 DistributedDataStore dataStore = setupDistributedDataStore(
739 "testCreateChainedTransactionAfterClose", "test-1");
741 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
745 // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
747 assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
752 public void testChangeListenerRegistration() throws Exception{
753 new IntegrationTestKit(getSystem()) {{
754 DistributedDataStore dataStore =
755 setupDistributedDataStore("testChangeListenerRegistration", "test-1");
757 testWriteTransaction(dataStore, TestModel.TEST_PATH,
758 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
760 MockDataChangeListener listener = new MockDataChangeListener(1);
762 ListenerRegistration<MockDataChangeListener>
763 listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
764 DataChangeScope.SUBTREE);
766 assertNotNull("registerChangeListener returned null", listenerReg);
768 // Wait for the initial notification
770 listener.waitForChangeEvents(TestModel.TEST_PATH);
776 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
777 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
779 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
780 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
781 testWriteTransaction(dataStore, listPath,
782 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
784 // Wait for the 2 updates.
786 listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
790 testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
791 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
792 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
794 listener.expectNoMoreChanges("Received unexpected change after close");
800 class IntegrationTestKit extends ShardTestKit {
802 IntegrationTestKit(ActorSystem actorSystem) {
806 DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
807 return setupDistributedDataStore(typeName, true, shardNames);
810 DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader,
811 String... shardNames) {
812 MockClusterWrapper cluster = new MockClusterWrapper();
813 Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
814 ShardStrategyFactory.setConfiguration(config);
816 datastoreContextBuilder.dataStoreType(typeName);
818 DatastoreContext datastoreContext = datastoreContextBuilder.build();
820 DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
821 config, datastoreContext);
823 SchemaContext schemaContext = SchemaContextHelper.full();
824 dataStore.onGlobalContextUpdated(schemaContext);
826 if(waitUntilLeader) {
827 for(String shardName: shardNames) {
828 ActorRef shard = null;
829 for(int i = 0; i < 20 * 5 && shard == null; i++) {
830 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
831 Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
832 if(shardReply.isPresent()) {
833 shard = shardReply.get();
837 assertNotNull("Shard was not created", shard);
839 waitUntilLeader(shard);
846 void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
847 NormalizedNode<?, ?> nodeToWrite) throws Exception {
849 // 1. Create a write-only Tx
851 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
852 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
854 // 2. Write some data
856 writeTx.write(nodePath, nodeToWrite);
858 // 3. Ready the Tx for commit
860 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
866 // 5. Verify the data in the store
868 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
870 Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
871 assertEquals("isPresent", true, optional.isPresent());
872 assertEquals("Data node", nodeToWrite, optional.get());
875 void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
876 Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
877 assertEquals("canCommit", true, canCommit);
878 cohort.preCommit().get(5, TimeUnit.SECONDS);
879 cohort.commit().get(5, TimeUnit.SECONDS);
882 void cleanup(DistributedDataStore dataStore) {
883 dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
886 void assertExceptionOnCall(Callable<Void> callable, Class<? extends Exception> expType)
890 fail("Expected " + expType.getSimpleName());
891 } catch(Exception e) {
892 assertEquals("Exception type", expType, e.getClass());
896 void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
897 Class<? extends Exception> expType) throws Exception {
898 assertExceptionOnCall(new Callable<Void>() {
900 public Void call() throws Exception {
901 txChain.newWriteOnlyTransaction();
906 assertExceptionOnCall(new Callable<Void>() {
908 public Void call() throws Exception {
909 txChain.newReadWriteTransaction();
914 assertExceptionOnCall(new Callable<Void>() {
916 public Void call() throws Exception {
917 txChain.newReadOnlyTransaction();