X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreIntegrationTest.java;h=a8384d8758a7bc3523bf75f1f3e31480d21ae737;hb=edcc020c8fda4b13f22a31d79c13feef0b53b0ee;hp=12c566d33de786db62bfad3ec86a56939edbe1c2;hpb=167a09a3b421fba8d10b009d9fa8868a15ab7624;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 12c566d33d..a8384d8758 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -2,12 +2,16 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -16,15 +20,16 @@ import org.junit.Test; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; -import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; @@ -142,8 +147,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } - @Test - public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{ + private void testTransactionWritesWithShardNotInitiallyReady(final boolean writeOnly) throws Exception { new IntegrationTestKit(getSystem()) {{ String testName = "testTransactionWritesWithShardNotInitiallyReady"; String shardName = "test-1"; @@ -158,7 +162,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // Create the write Tx - final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : + dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", writeTx); // Do some modification operations and ready the Tx on a separate thread. @@ -234,7 +239,18 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { } @Test - public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{ + public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception { + datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + testTransactionWritesWithShardNotInitiallyReady(true); + } + + @Test + public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception { + testTransactionWritesWithShardNotInitiallyReady(false); + } + + @Test + public void testTransactionReadsWithShardNotInitiallyReady() throws Exception { new IntegrationTestKit(getSystem()) {{ String testName = "testTransactionReadsWithShardNotInitiallyReady"; String shardName = "test-1"; @@ -449,16 +465,16 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } - @Test(expected=NoShardLeaderException.class) - public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{ + private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable { new IntegrationTestKit(getSystem()) {{ String testName = "testTransactionCommitFailureWithNoShardLeader"; - String shardName = "test-1"; + String shardName = "default"; // We don't want the shard to become the leader so prevent shard election from completing // by setting the election timeout, which is based on the heartbeat interval, really high. datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000); + datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); // Set the leader election timeout low for the test. @@ -468,7 +484,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // Create the write Tx. - final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : + dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", writeTx); // Do some modifications and ready the Tx on a separate thread. @@ -480,8 +497,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Override public void run() { try { - writeTx.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + writeTx.write(TestModel.JUNK_PATH, + ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); txCohort.set(writeTx.ready()); } catch(Exception e) { @@ -517,6 +534,17 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } + @Test(expected=NoShardLeaderException.class) + public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable { + datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + testTransactionCommitFailureWithNoShardLeader(true); + } + + @Test(expected=NoShardLeaderException.class) + public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable { + testTransactionCommitFailureWithNoShardLeader(false); + } + @Test public void testTransactionAbort() throws Exception{ System.setProperty("shard.persistent", "true"); @@ -601,7 +629,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // 7. Create a new read Tx from the chain to read the data from the last RW Tx to // verify it is visible. - readTx = txChain.newReadOnlyTransaction(); + readTx = txChain.newReadWriteTransaction(); optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); assertEquals("Data node", outerNode, optional.get()); @@ -630,6 +658,96 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } + @Test + public void testCreateChainedTransactionsInQuickSuccession() throws Exception{ + new IntegrationTestKit(getSystem()) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionsInQuickSuccession", "test-1"); + + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + int nTxs = 20; + List cohorts = new ArrayList<>(nTxs); + for(int i = 0; i < nTxs; i++) { + DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + + rwTx.merge(TestModel.TEST_PATH, testNode); + + cohorts.add(rwTx.ready()); + + } + + for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { + doCommit(cohort); + } + + txChain.close(); + + cleanup(dataStore); + }}; + } + + @Test + public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{ + new IntegrationTestKit(getSystem()) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionAfterEmptyTxReadied", "test-1"); + + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction(); + + rwTx1.ready(); + + DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction(); + + Optional> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", false, optional.isPresent()); + + txChain.close(); + + cleanup(dataStore); + }}; + } + + @Test + public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable { + new IntegrationTestKit(getSystem()) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionWhenPreviousNotReady", "test-1"); + + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); + + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't + // readied. + + assertExceptionOnTxChainCreates(txChain, IllegalStateException.class); + }}; + } + + @Test + public void testCreateChainedTransactionAfterClose() throws Throwable { + new IntegrationTestKit(getSystem()) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionAfterClose", "test-1"); + + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + txChain.close(); + + // Try to create another Tx of each type - should fail b/c the previous Tx was closed. + + assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class); + }}; + } + @Test public void testChangeListenerRegistration() throws Exception{ new IntegrationTestKit(getSystem()) {{ @@ -695,8 +813,11 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf"); ShardStrategyFactory.setConfiguration(config); + datastoreContextBuilder.dataStoreType(typeName); + DatastoreContext datastoreContext = datastoreContextBuilder.build(); - DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster, + + DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, datastoreContext); SchemaContext schemaContext = SchemaContextHelper.full(); @@ -761,6 +882,43 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { void cleanup(DistributedDataStore dataStore) { dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null); } + + void assertExceptionOnCall(Callable callable, Class expType) + throws Exception { + try { + callable.call(); + fail("Expected " + expType.getSimpleName()); + } catch(Exception e) { + assertEquals("Exception type", expType, e.getClass()); + } + } + + void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain, + Class expType) throws Exception { + assertExceptionOnCall(new Callable() { + @Override + public Void call() throws Exception { + txChain.newWriteOnlyTransaction(); + return null; + } + }, expType); + + assertExceptionOnCall(new Callable() { + @Override + public Void call() throws Exception { + txChain.newReadWriteTransaction(); + return null; + } + }, expType); + + assertExceptionOnCall(new Callable() { + @Override + public Void call() throws Exception { + txChain.newReadOnlyTransaction(); + return null; + } + }, expType); + } } }