X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreIntegrationTest.java;h=f3d93b896de082534f1ed4e8879117226b3952a6;hp=a8384d8758a7bc3523bf75f1f3e31480d21ae737;hb=daaef05cbf70e6cbec9af181258faead6d9620a6;hpb=a681e6bec3bbd7b536302ee9e083ae04b7f5ebdd diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index a8384d8758..f3d93b896d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -2,30 +2,32 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; -import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.actor.PoisonPill; +import akka.actor.Address; +import akka.actor.AddressFromURIString; +import akka.cluster.Cluster; +import akka.testkit.JavaTestKit; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.ConfigFactory; +import java.io.IOException; +import java.math.BigInteger; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; -import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; -import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; -import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; @@ -37,19 +39,38 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -public class DistributedDataStoreIntegrationTest extends AbstractActorTest { +public class DistributedDataStoreIntegrationTest { + + private static ActorSystem system; private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100); + @BeforeClass + public static void setUpClass() throws IOException { + system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); + Cluster.get(system).join(member1Address); + } + + @AfterClass + public static void tearDownClass() throws IOException { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + protected ActorSystem getSystem() { + return system; + } + @Test public void testWriteTransactionWithSingleShard() throws Exception{ - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", "test-1"); @@ -65,47 +86,59 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testWriteTransactionWithMultipleShards() throws Exception{ - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1"); DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); assertNotNull("newWriteOnlyTransaction returned null", writeTx); - YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH; - NormalizedNode nodeToWrite1 = CarsModel.emptyContainer(); - writeTx.write(nodePath1, nodeToWrite1); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH; - NormalizedNode nodeToWrite2 = PeopleModel.emptyContainer(); - writeTx.write(nodePath2, nodeToWrite2); + doCommit(writeTx.ready()); - DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + writeTx = dataStore.newWriteOnlyTransaction(); - doCommit(cohort); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + + doCommit(writeTx.ready()); + + writeTx = dataStore.newWriteOnlyTransaction(); + + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + writeTx.write(carPath, car); + + MapEntryNode person = PeopleModel.newPersonEntry("jack"); + YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + writeTx.write(personPath, person); + + doCommit(writeTx.ready()); // Verify the data in the store DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - Optional> optional = readTx.read(nodePath1).get(); + Optional> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", nodeToWrite1, optional.get()); + assertEquals("Data node", car, optional.get()); - optional = readTx.read(nodePath2).get(); + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", nodeToWrite2, optional.get()); + assertEquals("Data node", person, optional.get()); cleanup(dataStore); }}; } @Test - public void testReadWriteTransaction() throws Exception{ + public void testReadWriteTransactionWithSingleShard() throws Exception{ System.setProperty("shard.persistent", "true"); - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = - setupDistributedDataStore("testReadWriteTransaction", "test-1"); + setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1"); // 1. Create a read-write Tx @@ -147,9 +180,65 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } - private void testTransactionWritesWithShardNotInitiallyReady(final boolean writeOnly) throws Exception { - new IntegrationTestKit(getSystem()) {{ - String testName = "testTransactionWritesWithShardNotInitiallyReady"; + @Test + public void testReadWriteTransactionWithMultipleShards() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = + setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1"); + + DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); + + readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + + doCommit(readWriteTx.ready()); + + readWriteTx = dataStore.newReadWriteTransaction(); + + readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + + doCommit(readWriteTx.ready()); + + readWriteTx = dataStore.newReadWriteTransaction(); + + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + readWriteTx.write(carPath, car); + + MapEntryNode person = PeopleModel.newPersonEntry("jack"); + YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + readWriteTx.write(personPath, person); + + Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS); + assertEquals("exists", true, exists); + + Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); + + doCommit(readWriteTx.ready()); + + // Verify the data in the store + + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); + + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); + + cleanup(dataStore); + }}; + } + + private void testTransactionWritesWithShardNotInitiallyReady(final String testName, + final boolean writeOnly) throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ String shardName = "test-1"; // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't @@ -241,17 +330,17 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception { datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); - testTransactionWritesWithShardNotInitiallyReady(true); + testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true); } @Test public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception { - testTransactionWritesWithShardNotInitiallyReady(false); + testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false); } @Test public void testTransactionReadsWithShardNotInitiallyReady() throws Exception { - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ String testName = "testTransactionReadsWithShardNotInitiallyReady"; String shardName = "test-1"; @@ -324,7 +413,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test(expected=NotInitializedException.class) public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{ - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ String testName = "testTransactionCommitFailureWithShardNotInitialized"; String shardName = "test-1"; @@ -394,7 +483,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test(expected=NotInitializedException.class) public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{ - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ String testName = "testTransactionReadFailureWithShardNotInitialized"; String shardName = "test-1"; @@ -466,7 +555,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { } private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable { - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ String testName = "testTransactionCommitFailureWithNoShardLeader"; String shardName = "default"; @@ -548,7 +637,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testTransactionAbort() throws Exception{ System.setProperty("shard.persistent", "true"); - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore("transactionAbortIntegrationTest", "test-1"); @@ -571,9 +660,9 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { } @Test - public void testTransactionChain() throws Exception{ - new IntegrationTestKit(getSystem()) {{ - DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChain", "test-1"); + public void testTransactionChainWithSingleShard() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1"); // 1. Create a Tx chain and write-only Tx @@ -658,9 +747,74 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } + @Test + public void testTransactionChainWithMultipleShards() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards", + "cars-1", "people-1"); + + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); + + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + + DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + + DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction(); + + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + readWriteTx.write(carPath, car); + + MapEntryNode person = PeopleModel.newPersonEntry("jack"); + YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + readWriteTx.merge(personPath, person); + + Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); + + optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); + + DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); + + writeTx = txChain.newWriteOnlyTransaction(); + + //writeTx.delete(personPath); + + DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready(); + + doCommit(cohort1); + doCommit(cohort2); + doCommit(cohort3); + + txChain.close(); + + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); + + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + //assertEquals("isPresent", false, optional.isPresent()); + assertEquals("isPresent", true, optional.isPresent()); + + cleanup(dataStore); + }}; + } + @Test public void testCreateChainedTransactionsInQuickSuccession() throws Exception{ - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore( "testCreateChainedTransactionsInQuickSuccession", "test-1"); @@ -691,7 +845,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{ - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore( "testCreateChainedTransactionAfterEmptyTxReadied", "test-1"); @@ -714,7 +868,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable { - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore( "testCreateChainedTransactionWhenPreviousNotReady", "test-1"); @@ -734,7 +888,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testCreateChainedTransactionAfterClose() throws Throwable { - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore( "testCreateChainedTransactionAfterClose", "test-1"); @@ -750,7 +904,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testChangeListenerRegistration() throws Exception{ - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore("testChangeListenerRegistration", "test-1"); @@ -796,129 +950,4 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { cleanup(dataStore); }}; } - - class IntegrationTestKit extends ShardTestKit { - - IntegrationTestKit(ActorSystem actorSystem) { - super(actorSystem); - } - - DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) { - return setupDistributedDataStore(typeName, true, shardNames); - } - - DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader, - String... shardNames) { - MockClusterWrapper cluster = new MockClusterWrapper(); - Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf"); - ShardStrategyFactory.setConfiguration(config); - - datastoreContextBuilder.dataStoreType(typeName); - - DatastoreContext datastoreContext = datastoreContextBuilder.build(); - - DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, - config, datastoreContext); - - SchemaContext schemaContext = SchemaContextHelper.full(); - dataStore.onGlobalContextUpdated(schemaContext); - - if(waitUntilLeader) { - for(String shardName: shardNames) { - ActorRef shard = null; - for(int i = 0; i < 20 * 5 && shard == null; i++) { - Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); - Optional shardReply = dataStore.getActorContext().findLocalShard(shardName); - if(shardReply.isPresent()) { - shard = shardReply.get(); - } - } - - assertNotNull("Shard was not created", shard); - - waitUntilLeader(shard); - } - } - - return dataStore; - } - - void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath, - NormalizedNode nodeToWrite) throws Exception { - - // 1. Create a write-only Tx - - DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); - - // 2. Write some data - - writeTx.write(nodePath, nodeToWrite); - - // 3. Ready the Tx for commit - - DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); - - // 4. Commit the Tx - - doCommit(cohort); - - // 5. Verify the data in the store - - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - - Optional> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", nodeToWrite, optional.get()); - } - - void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception { - Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort.preCommit().get(5, TimeUnit.SECONDS); - cohort.commit().get(5, TimeUnit.SECONDS); - } - - void cleanup(DistributedDataStore dataStore) { - dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null); - } - - void assertExceptionOnCall(Callable callable, Class expType) - throws Exception { - try { - callable.call(); - fail("Expected " + expType.getSimpleName()); - } catch(Exception e) { - assertEquals("Exception type", expType, e.getClass()); - } - } - - void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain, - Class expType) throws Exception { - assertExceptionOnCall(new Callable() { - @Override - public Void call() throws Exception { - txChain.newWriteOnlyTransaction(); - return null; - } - }, expType); - - assertExceptionOnCall(new Callable() { - @Override - public Void call() throws Exception { - txChain.newReadWriteTransaction(); - return null; - } - }, expType); - - assertExceptionOnCall(new Callable() { - @Override - public Void call() throws Exception { - txChain.newReadOnlyTransaction(); - return null; - } - }, expType); - } - } - }