+ // 2. Write some data
+
+ NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ writeTx.write(TestModel.TEST_PATH, testNode);
+
+ // 3. Ready the Tx for commit
+
+ final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
+
+ // 4. Commit the Tx on another thread that first waits for the second read Tx.
+
+ final CountDownLatch continueCommit1 = new CountDownLatch(1);
+ final CountDownLatch commit1Done = new CountDownLatch(1);
+ final AtomicReference<Exception> commit1Error = new AtomicReference<>();
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ continueCommit1.await();
+ doCommit(cohort1);
+ } catch (Exception e) {
+ commit1Error.set(e);
+ } finally {
+ commit1Done.countDown();
+ }
+ }
+ }.start();
+
+ // 5. Create a new read Tx from the chain to read and verify the data from the first
+ // Tx is visible after being readied.
+
+ DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
+ Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("Data node", testNode, optional.get());
+
+ // 6. Create a new RW Tx from the chain, write more data, and ready it
+
+ DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
+ MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
+ rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
+
+ DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
+
+ // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
+ // verify it is visible.
+
+ readTx = txChain.newReadWriteTransaction();
+ optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("Data node", outerNode, optional.get());
+
+ // 8. Wait for the 2 commits to complete and close the chain.
+
+ continueCommit1.countDown();
+ Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
+
+ if(commit1Error.get() != null) {
+ throw commit1Error.get();
+ }
+
+ doCommit(cohort2);
+
+ txChain.close();
+
+ // 9. Create a new read Tx from the data store and verify committed data.
+
+ readTx = dataStore.newReadOnlyTransaction();
+ optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("Data node", outerNode, optional.get());
+
+ cleanup(dataStore);
+ }};
+ }
+
+ @Test
+ public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
+ new IntegrationTestKit(getSystem()) {{
+ DistributedDataStore dataStore = setupDistributedDataStore(
+ "testCreateChainedTransactionsInQuickSuccession", "test-1");
+
+ DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+
+ NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ int nTxs = 20;
+ List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(nTxs);
+ for(int i = 0; i < nTxs; i++) {
+ DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
+
+ rwTx.merge(TestModel.TEST_PATH, testNode);
+
+ cohorts.add(rwTx.ready());
+
+ }
+
+ for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
+ doCommit(cohort);
+ }
+
+ txChain.close();
+
+ cleanup(dataStore);
+ }};
+ }
+
+ @Test
+ public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
+ new IntegrationTestKit(getSystem()) {{
+ DistributedDataStore dataStore = setupDistributedDataStore(
+ "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
+
+ DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+
+ DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
+
+ rwTx1.ready();
+
+ DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
+
+ Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", false, optional.isPresent());
+
+ txChain.close();
+
+ cleanup(dataStore);
+ }};
+ }
+
+ @Test
+ public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
+ new IntegrationTestKit(getSystem()) {{
+ DistributedDataStore dataStore = setupDistributedDataStore(
+ "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
+
+ final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+
+ DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ assertNotNull("newWriteOnlyTransaction returned null", writeTx);
+
+ writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
+ // readied.
+
+ assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
+ }};
+ }
+
+ @Test
+ public void testCreateChainedTransactionAfterClose() throws Throwable {
+ new IntegrationTestKit(getSystem()) {{
+ DistributedDataStore dataStore = setupDistributedDataStore(
+ "testCreateChainedTransactionAfterClose", "test-1");
+
+ DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+
+ txChain.close();
+
+ // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
+
+ assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
+ }};
+ }
+
+ @Test
+ public void testChangeListenerRegistration() throws Exception{
+ new IntegrationTestKit(getSystem()) {{
+ DistributedDataStore dataStore =
+ setupDistributedDataStore("testChangeListenerRegistration", "test-1");
+
+ testWriteTransaction(dataStore, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ MockDataChangeListener listener = new MockDataChangeListener(1);
+
+ ListenerRegistration<MockDataChangeListener>
+ listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
+ DataChangeScope.SUBTREE);
+
+ assertNotNull("registerChangeListener returned null", listenerReg);
+
+ // Wait for the initial notification
+
+ listener.waitForChangeEvents(TestModel.TEST_PATH);
+
+ listener.reset(2);
+
+ // Write 2 updates.
+
+ testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+
+ YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
+ nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
+ testWriteTransaction(dataStore, listPath,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
+
+ // Wait for the 2 updates.
+
+ listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
+
+ listenerReg.close();
+
+ testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
+ nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
+
+ listener.expectNoMoreChanges("Received unexpected change after close");
+
+ cleanup(dataStore);
+ }};
+ }
+
+ class IntegrationTestKit extends ShardTestKit {
+
+ IntegrationTestKit(ActorSystem actorSystem) {
+ super(actorSystem);
+ }
+
+ DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
+ return setupDistributedDataStore(typeName, true, shardNames);
+ }
+
+ DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader,
+ String... shardNames) {
+ MockClusterWrapper cluster = new MockClusterWrapper();
+ Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
+ ShardStrategyFactory.setConfiguration(config);
+
+ DatastoreContext datastoreContext = datastoreContextBuilder.build();
+ DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster,
+ config, datastoreContext);
+
+ SchemaContext schemaContext = SchemaContextHelper.full();
+ dataStore.onGlobalContextUpdated(schemaContext);
+
+ if(waitUntilLeader) {
+ for(String shardName: shardNames) {
+ ActorRef shard = null;
+ for(int i = 0; i < 20 * 5 && shard == null; i++) {
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
+ if(shardReply.isPresent()) {
+ shard = shardReply.get();