X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreIntegrationTest.java;h=9f5aded3521b7c72f9b50fff77a988263f1e039d;hb=4061a49630bd90e2a4839fc5f5622e1faaebfd67;hp=cec7ce1e3fc250a84231f84e83d97996bbe2e55a;hpb=a564badf85498c6f2c85c831dcb6319953214b4f;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index cec7ce1e3f..9f5aded352 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -1,11 +1,21 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; @@ -19,6 +29,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; @@ -26,16 +37,10 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @@ -77,10 +82,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); - Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort.preCommit().get(5, TimeUnit.SECONDS); - cohort.commit().get(5, TimeUnit.SECONDS); + doCommit(cohort); // Verify the data in the store @@ -131,10 +133,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // 5. Commit the Tx - Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort.preCommit().get(5, TimeUnit.SECONDS); - cohort.commit().get(5, TimeUnit.SECONDS); + doCommit(cohort); // 6. Verify the data in the store @@ -219,9 +218,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // Wait for the Tx commit to complete. - assertEquals("canCommit", true, txCohort.get().canCommit().get(5, TimeUnit.SECONDS)); - txCohort.get().preCommit().get(5, TimeUnit.SECONDS); - txCohort.get().commit().get(5, TimeUnit.SECONDS); + doCommit(txCohort.get()); // Verify the data in the store @@ -552,10 +549,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testTransactionChain() throws Exception{ - System.setProperty("shard.persistent", "true"); new IntegrationTestKit(getSystem()) {{ - DistributedDataStore dataStore = - setupDistributedDataStore("transactionChainIntegrationTest", "test-1"); + DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChain", "test-1"); // 1. Create a Tx chain and write-only Tx @@ -566,27 +561,104 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // 2. Write some data - NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - writeTx.write(TestModel.TEST_PATH, containerNode); + NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + writeTx.write(TestModel.TEST_PATH, testNode); // 3. Ready the Tx for commit - DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - // 4. Commit the Tx + // 4. Commit the Tx on another thread that first waits for the second read Tx. - Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort.preCommit().get(5, TimeUnit.SECONDS); - cohort.commit().get(5, TimeUnit.SECONDS); + final CountDownLatch continueCommit1 = new CountDownLatch(1); + final CountDownLatch commit1Done = new CountDownLatch(1); + final AtomicReference commit1Error = new AtomicReference<>(); + new Thread() { + @Override + public void run() { + try { + continueCommit1.await(); + doCommit(cohort1); + } catch (Exception e) { + commit1Error.set(e); + } finally { + commit1Done.countDown(); + } + } + }.start(); - // 5. Verify the data in the store + // 5. Create a new read Tx from the chain to read and verify the data from the first + // Tx is visible after being readied. DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction(); - Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", containerNode, optional.get()); + assertEquals("Data node", testNode, optional.get()); + + // 6. Create a new RW Tx from the chain, write more data, and ready it + + DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(); + rwTx.write(TestModel.OUTER_LIST_PATH, outerNode); + + DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready(); + + // 7. Create a new read Tx from the chain to read the data from the last RW Tx to + // verify it is visible. + + readTx = txChain.newReadWriteTransaction(); + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", outerNode, optional.get()); + + // 8. Wait for the 2 commits to complete and close the chain. + + continueCommit1.countDown(); + Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS); + + if(commit1Error.get() != null) { + throw commit1Error.get(); + } + + doCommit(cohort2); + + txChain.close(); + + // 9. Create a new read Tx from the data store and verify committed data. + + readTx = dataStore.newReadOnlyTransaction(); + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", outerNode, optional.get()); + + cleanup(dataStore); + }}; + } + + @Test + public void testCreateChainedTransactionsInQuickSuccession() throws Exception{ + new IntegrationTestKit(getSystem()) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionsInQuickSuccession", "test-1"); + + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + int nTxs = 20; + List cohorts = new ArrayList<>(nTxs); + for(int i = 0; i < nTxs; i++) { + DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + + rwTx.merge(TestModel.TEST_PATH, testNode); + + cohorts.add(rwTx.ready()); + + } + + for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { + doCommit(cohort); + } txChain.close(); @@ -594,13 +666,75 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } + @Test + public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{ + new IntegrationTestKit(getSystem()) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionAfterEmptyTxReadied", "test-1"); + + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction(); + + rwTx1.ready(); + + DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction(); + + Optional> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", false, optional.isPresent()); + + txChain.close(); + + cleanup(dataStore); + }}; + } + + @Test + public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable { + new IntegrationTestKit(getSystem()) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionWhenPreviousNotReady", "test-1"); + + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); + + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't + // readied. + + assertExceptionOnTxChainCreates(txChain, IllegalStateException.class); + }}; + } + + @Test + public void testCreateChainedTransactionAfterClose() throws Throwable { + new IntegrationTestKit(getSystem()) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionAfterClose", "test-1"); + + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + txChain.close(); + + // Try to create another Tx of each type - should fail b/c the previous Tx was closed. + + assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class); + }}; + } + @Test public void testChangeListenerRegistration() throws Exception{ new IntegrationTestKit(getSystem()) {{ DistributedDataStore dataStore = setupDistributedDataStore("testChangeListenerRegistration", "test-1"); - MockDataChangeListener listener = new MockDataChangeListener(3); + testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + MockDataChangeListener listener = new MockDataChangeListener(1); ListenerRegistration listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener, @@ -608,8 +742,13 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { assertNotNull("registerChangeListener returned null", listenerReg); - testWriteTransaction(dataStore, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + // Wait for the initial notification + + listener.waitForChangeEvents(TestModel.TEST_PATH); + + listener.reset(2); + + // Write 2 updates. testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); @@ -619,7 +758,9 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { testWriteTransaction(dataStore, listPath, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); - listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath ); + // Wait for the 2 updates. + + listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); listenerReg.close(); @@ -694,10 +835,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // 4. Commit the Tx - Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort.preCommit().get(5, TimeUnit.SECONDS); - cohort.commit().get(5, TimeUnit.SECONDS); + doCommit(cohort); // 5. Verify the data in the store @@ -708,9 +846,53 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { assertEquals("Data node", nodeToWrite, optional.get()); } + void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception { + Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); + assertEquals("canCommit", true, canCommit); + cohort.preCommit().get(5, TimeUnit.SECONDS); + cohort.commit().get(5, TimeUnit.SECONDS); + } + void cleanup(DistributedDataStore dataStore) { dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null); } + + void assertExceptionOnCall(Callable callable, Class expType) + throws Exception { + try { + callable.call(); + fail("Expected " + expType.getSimpleName()); + } catch(Exception e) { + assertEquals("Exception type", expType, e.getClass()); + } + } + + void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain, + Class expType) throws Exception { + assertExceptionOnCall(new Callable() { + @Override + public Void call() throws Exception { + txChain.newWriteOnlyTransaction(); + return null; + } + }, expType); + + assertExceptionOnCall(new Callable() { + @Override + public Void call() throws Exception { + txChain.newReadWriteTransaction(); + return null; + } + }, expType); + + assertExceptionOnCall(new Callable() { + @Override + public Void call() throws Exception { + txChain.newReadOnlyTransaction(); + return null; + } + }, expType); + } } }