1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.fail;
6 import akka.actor.ActorRef;
7 import akka.actor.ActorSystem;
8 import akka.actor.PoisonPill;
9 import com.google.common.base.Optional;
10 import com.google.common.util.concurrent.CheckedFuture;
11 import com.google.common.util.concurrent.Uninterruptibles;
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.CountDownLatch;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicReference;
19 import org.junit.Test;
20 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
21 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
22 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
23 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
24 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
25 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
26 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
27 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
28 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
29 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
31 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
33 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
38 import org.opendaylight.yangtools.concepts.ListenerRegistration;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
45 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
47 private final DatastoreContext.Builder datastoreContextBuilder =
48 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
51 public void testWriteTransactionWithSingleShard() throws Exception{
52 new IntegrationTestKit(getSystem()) {{
53 DistributedDataStore dataStore =
54 setupDistributedDataStore("transactionIntegrationTest", "test-1");
56 testWriteTransaction(dataStore, TestModel.TEST_PATH,
57 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
59 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
60 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
67 public void testWriteTransactionWithMultipleShards() throws Exception{
68 new IntegrationTestKit(getSystem()) {{
69 DistributedDataStore dataStore =
70 setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
72 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
73 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
75 YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH;
76 NormalizedNode<?, ?> nodeToWrite1 = CarsModel.emptyContainer();
77 writeTx.write(nodePath1, nodeToWrite1);
79 YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH;
80 NormalizedNode<?, ?> nodeToWrite2 = PeopleModel.emptyContainer();
81 writeTx.write(nodePath2, nodeToWrite2);
83 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
87 // Verify the data in the store
89 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
91 Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath1).get();
92 assertEquals("isPresent", true, optional.isPresent());
93 assertEquals("Data node", nodeToWrite1, optional.get());
95 optional = readTx.read(nodePath2).get();
96 assertEquals("isPresent", true, optional.isPresent());
97 assertEquals("Data node", nodeToWrite2, optional.get());
104 public void testReadWriteTransaction() throws Exception{
105 System.setProperty("shard.persistent", "true");
106 new IntegrationTestKit(getSystem()) {{
107 DistributedDataStore dataStore =
108 setupDistributedDataStore("testReadWriteTransaction", "test-1");
110 // 1. Create a read-write Tx
112 DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
113 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
115 // 2. Write some data
117 YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
118 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
119 readWriteTx.write(nodePath, nodeToWrite );
121 // 3. Read the data from Tx
123 Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
124 assertEquals("exists", true, exists);
126 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
127 assertEquals("isPresent", true, optional.isPresent());
128 assertEquals("Data node", nodeToWrite, optional.get());
130 // 4. Ready the Tx for commit
132 DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
138 // 6. Verify the data in the store
140 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
142 optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
143 assertEquals("isPresent", true, optional.isPresent());
144 assertEquals("Data node", nodeToWrite, optional.get());
151 public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{
152 new IntegrationTestKit(getSystem()) {{
153 String testName = "testTransactionWritesWithShardNotInitiallyReady";
154 String shardName = "test-1";
156 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
157 // initialized until we create and submit the write the Tx.
158 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
159 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
160 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
162 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
164 // Create the write Tx
166 // TODO - we'll want to test this with write-only as well when FindPrimary returns the leader shard.
167 final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction();
168 assertNotNull("newReadWriteTransaction returned null", writeTx);
170 // Do some modification operations and ready the Tx on a separate thread.
172 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
173 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
174 TestModel.ID_QNAME, 1).build();
176 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
177 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
178 final CountDownLatch txReady = new CountDownLatch(1);
179 Thread txThread = new Thread() {
183 writeTx.write(TestModel.TEST_PATH,
184 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
186 writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
187 TestModel.OUTER_LIST_QNAME).build());
189 writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
190 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
192 writeTx.delete(listEntryPath);
194 txCohort.set(writeTx.ready());
195 } catch(Exception e) {
206 // Wait for the Tx operations to complete.
208 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
209 if(caughtEx.get() != null) {
210 throw caughtEx.get();
213 assertEquals("Tx ready", true, done);
215 // At this point the Tx operations should be waiting for the shard to initialize so
216 // trigger the latch to let the shard recovery to continue.
218 blockRecoveryLatch.countDown();
220 // Wait for the Tx commit to complete.
222 doCommit(txCohort.get());
224 // Verify the data in the store
226 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
228 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
229 get(5, TimeUnit.SECONDS);
230 assertEquals("isPresent", true, optional.isPresent());
232 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
233 assertEquals("isPresent", true, optional.isPresent());
235 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
236 assertEquals("isPresent", false, optional.isPresent());
243 public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{
244 new IntegrationTestKit(getSystem()) {{
245 String testName = "testTransactionReadsWithShardNotInitiallyReady";
246 String shardName = "test-1";
248 // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
249 // initialized until we create the Tx.
250 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
251 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
252 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
254 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
256 // Create the read-write Tx
258 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
259 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
261 // Do some reads on the Tx on a separate thread.
263 final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
264 new AtomicReference<>();
265 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
266 txReadFuture = new AtomicReference<>();
267 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
268 final CountDownLatch txReadsDone = new CountDownLatch(1);
269 Thread txThread = new Thread() {
273 readWriteTx.write(TestModel.TEST_PATH,
274 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
276 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
278 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
279 } catch(Exception e) {
283 txReadsDone.countDown();
290 // Wait for the Tx operations to complete.
292 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
293 if(caughtEx.get() != null) {
294 throw caughtEx.get();
297 assertEquals("Tx reads done", true, done);
299 // At this point the Tx operations should be waiting for the shard to initialize so
300 // trigger the latch to let the shard recovery to continue.
302 blockRecoveryLatch.countDown();
304 // Wait for the reads to complete and verify.
306 assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
307 assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
315 @Test(expected=NotInitializedException.class)
316 public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
317 new IntegrationTestKit(getSystem()) {{
318 String testName = "testTransactionCommitFailureWithShardNotInitialized";
319 String shardName = "test-1";
321 // Set the shard initialization timeout low for the test.
323 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
325 // Setup the InMemoryJournal to block shard recovery indefinitely.
327 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
328 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
329 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
331 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
333 // Create the write Tx
335 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
336 assertNotNull("newReadWriteTransaction returned null", writeTx);
338 // Do some modifications and ready the Tx on a separate thread.
340 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
341 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
342 final CountDownLatch txReady = new CountDownLatch(1);
343 Thread txThread = new Thread() {
347 writeTx.write(TestModel.TEST_PATH,
348 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
350 txCohort.set(writeTx.ready());
351 } catch(Exception e) {
362 // Wait for the Tx operations to complete.
364 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
365 if(caughtEx.get() != null) {
366 throw caughtEx.get();
369 assertEquals("Tx ready", true, done);
371 // Wait for the commit to complete. Since the shard never initialized, the Tx should
372 // have timed out and throw an appropriate exception cause.
375 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
376 } catch(ExecutionException e) {
379 blockRecoveryLatch.countDown();
385 @Test(expected=NotInitializedException.class)
386 public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
387 new IntegrationTestKit(getSystem()) {{
388 String testName = "testTransactionReadFailureWithShardNotInitialized";
389 String shardName = "test-1";
391 // Set the shard initialization timeout low for the test.
393 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
395 // Setup the InMemoryJournal to block shard recovery indefinitely.
397 String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
398 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
399 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
401 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
403 // Create the read-write Tx
405 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
406 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
408 // Do a read on the Tx on a separate thread.
410 final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
411 txReadFuture = new AtomicReference<>();
412 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
413 final CountDownLatch txReadDone = new CountDownLatch(1);
414 Thread txThread = new Thread() {
418 readWriteTx.write(TestModel.TEST_PATH,
419 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
421 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
424 } catch(Exception e) {
428 txReadDone.countDown();
435 // Wait for the Tx operations to complete.
437 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
438 if(caughtEx.get() != null) {
439 throw caughtEx.get();
442 assertEquals("Tx read done", true, done);
444 // Wait for the read to complete. Since the shard never initialized, the Tx should
445 // have timed out and throw an appropriate exception cause.
448 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
449 } catch(ReadFailedException e) {
452 blockRecoveryLatch.countDown();
458 @Test(expected=NoShardLeaderException.class)
459 public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{
460 new IntegrationTestKit(getSystem()) {{
461 String testName = "testTransactionCommitFailureWithNoShardLeader";
462 String shardName = "test-1";
464 // We don't want the shard to become the leader so prevent shard election from completing
465 // by setting the election timeout, which is based on the heartbeat interval, really high.
467 datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
469 // Set the leader election timeout low for the test.
471 datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
473 DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
475 // Create the write Tx.
477 final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction();
478 assertNotNull("newReadWriteTransaction returned null", writeTx);
480 // Do some modifications and ready the Tx on a separate thread.
482 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
483 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
484 final CountDownLatch txReady = new CountDownLatch(1);
485 Thread txThread = new Thread() {
489 writeTx.write(TestModel.TEST_PATH,
490 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
492 txCohort.set(writeTx.ready());
493 } catch(Exception e) {
504 // Wait for the Tx operations to complete.
506 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
507 if(caughtEx.get() != null) {
508 throw caughtEx.get();
511 assertEquals("Tx ready", true, done);
513 // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
514 // should have timed out and throw an appropriate exception cause.
517 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
518 } catch(ExecutionException e) {
527 public void testTransactionAbort() throws Exception{
528 System.setProperty("shard.persistent", "true");
529 new IntegrationTestKit(getSystem()) {{
530 DistributedDataStore dataStore =
531 setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
533 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
534 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
536 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
538 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
540 cohort.canCommit().get(5, TimeUnit.SECONDS);
542 cohort.abort().get(5, TimeUnit.SECONDS);
544 testWriteTransaction(dataStore, TestModel.TEST_PATH,
545 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
552 public void testTransactionChain() throws Exception{
553 new IntegrationTestKit(getSystem()) {{
554 DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChain", "test-1");
556 // 1. Create a Tx chain and write-only Tx
558 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
560 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
561 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
563 // 2. Write some data
565 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
566 writeTx.write(TestModel.TEST_PATH, testNode);
568 // 3. Ready the Tx for commit
570 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
572 // 4. Commit the Tx on another thread that first waits for the second read Tx.
574 final CountDownLatch continueCommit1 = new CountDownLatch(1);
575 final CountDownLatch commit1Done = new CountDownLatch(1);
576 final AtomicReference<Exception> commit1Error = new AtomicReference<>();
581 continueCommit1.await();
583 } catch (Exception e) {
586 commit1Done.countDown();
591 // 5. Create a new read Tx from the chain to read and verify the data from the first
592 // Tx is visible after being readied.
594 DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
595 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
596 assertEquals("isPresent", true, optional.isPresent());
597 assertEquals("Data node", testNode, optional.get());
599 // 6. Create a new RW Tx from the chain, write more data, and ready it
601 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
602 MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
603 rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
605 DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
607 // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
608 // verify it is visible.
610 readTx = txChain.newReadWriteTransaction();
611 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
612 assertEquals("isPresent", true, optional.isPresent());
613 assertEquals("Data node", outerNode, optional.get());
615 // 8. Wait for the 2 commits to complete and close the chain.
617 continueCommit1.countDown();
618 Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
620 if(commit1Error.get() != null) {
621 throw commit1Error.get();
628 // 9. Create a new read Tx from the data store and verify committed data.
630 readTx = dataStore.newReadOnlyTransaction();
631 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
632 assertEquals("isPresent", true, optional.isPresent());
633 assertEquals("Data node", outerNode, optional.get());
640 public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
641 new IntegrationTestKit(getSystem()) {{
642 DistributedDataStore dataStore = setupDistributedDataStore(
643 "testCreateChainedTransactionsInQuickSuccession", "test-1");
645 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
647 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
650 List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(nTxs);
651 for(int i = 0; i < nTxs; i++) {
652 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
654 rwTx.merge(TestModel.TEST_PATH, testNode);
656 cohorts.add(rwTx.ready());
660 for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
671 public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
672 new IntegrationTestKit(getSystem()) {{
673 DistributedDataStore dataStore = setupDistributedDataStore(
674 "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
676 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
678 DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
682 DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
684 Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
685 assertEquals("isPresent", false, optional.isPresent());
694 public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
695 new IntegrationTestKit(getSystem()) {{
696 DistributedDataStore dataStore = setupDistributedDataStore(
697 "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
699 final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
701 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
702 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
704 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
706 // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
709 assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
714 public void testCreateChainedTransactionAfterClose() throws Throwable {
715 new IntegrationTestKit(getSystem()) {{
716 DistributedDataStore dataStore = setupDistributedDataStore(
717 "testCreateChainedTransactionAfterClose", "test-1");
719 DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
723 // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
725 assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
730 public void testChangeListenerRegistration() throws Exception{
731 new IntegrationTestKit(getSystem()) {{
732 DistributedDataStore dataStore =
733 setupDistributedDataStore("testChangeListenerRegistration", "test-1");
735 testWriteTransaction(dataStore, TestModel.TEST_PATH,
736 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
738 MockDataChangeListener listener = new MockDataChangeListener(1);
740 ListenerRegistration<MockDataChangeListener>
741 listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
742 DataChangeScope.SUBTREE);
744 assertNotNull("registerChangeListener returned null", listenerReg);
746 // Wait for the initial notification
748 listener.waitForChangeEvents(TestModel.TEST_PATH);
754 testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
755 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
757 YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
758 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
759 testWriteTransaction(dataStore, listPath,
760 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
762 // Wait for the 2 updates.
764 listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
768 testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
769 nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
770 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
772 listener.expectNoMoreChanges("Received unexpected change after close");
778 class IntegrationTestKit extends ShardTestKit {
780 IntegrationTestKit(ActorSystem actorSystem) {
784 DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
785 return setupDistributedDataStore(typeName, true, shardNames);
788 DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader,
789 String... shardNames) {
790 MockClusterWrapper cluster = new MockClusterWrapper();
791 Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
792 ShardStrategyFactory.setConfiguration(config);
794 datastoreContextBuilder.dataStoreType(typeName);
796 DatastoreContext datastoreContext = datastoreContextBuilder.build();
798 DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
799 config, datastoreContext);
801 SchemaContext schemaContext = SchemaContextHelper.full();
802 dataStore.onGlobalContextUpdated(schemaContext);
804 if(waitUntilLeader) {
805 for(String shardName: shardNames) {
806 ActorRef shard = null;
807 for(int i = 0; i < 20 * 5 && shard == null; i++) {
808 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
809 Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
810 if(shardReply.isPresent()) {
811 shard = shardReply.get();
815 assertNotNull("Shard was not created", shard);
817 waitUntilLeader(shard);
824 void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
825 NormalizedNode<?, ?> nodeToWrite) throws Exception {
827 // 1. Create a write-only Tx
829 DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
830 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
832 // 2. Write some data
834 writeTx.write(nodePath, nodeToWrite);
836 // 3. Ready the Tx for commit
838 DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
844 // 5. Verify the data in the store
846 DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
848 Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
849 assertEquals("isPresent", true, optional.isPresent());
850 assertEquals("Data node", nodeToWrite, optional.get());
853 void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
854 Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
855 assertEquals("canCommit", true, canCommit);
856 cohort.preCommit().get(5, TimeUnit.SECONDS);
857 cohort.commit().get(5, TimeUnit.SECONDS);
860 void cleanup(DistributedDataStore dataStore) {
861 dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
864 void assertExceptionOnCall(Callable<Void> callable, Class<? extends Exception> expType)
868 fail("Expected " + expType.getSimpleName());
869 } catch(Exception e) {
870 assertEquals("Exception type", expType, e.getClass());
874 void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
875 Class<? extends Exception> expType) throws Exception {
876 assertExceptionOnCall(new Callable<Void>() {
878 public Void call() throws Exception {
879 txChain.newWriteOnlyTransaction();
884 assertExceptionOnCall(new Callable<Void>() {
886 public Void call() throws Exception {
887 txChain.newReadWriteTransaction();
892 assertExceptionOnCall(new Callable<Void>() {
894 public Void call() throws Exception {
895 txChain.newReadOnlyTransaction();