X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreIntegrationTest.java;h=1cc7ae8ad02f93dd9f80135a47c1b5dfdfc2b58c;hp=116e5e75b50d261e3d9673378ee643cfe2f2e31f;hb=846b5ae74588943cc5bd176e219809ced07104d6;hpb=b35aa25e0c3a91fc71e778d9c9393e91036246e3 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 116e5e75b5..1cc7ae8ad0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -1,120 +1,323 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; +import akka.actor.PoisonPill; import com.google.common.base.Optional; -import com.google.common.util.concurrent.ListenableFuture; -import org.junit.After; -import org.junit.Before; +import com.google.common.util.concurrent.Uninterruptibles; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; -import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import java.util.concurrent.TimeUnit; -import java.util.concurrent.ExecutionException; +public class DistributedDataStoreIntegrationTest extends AbstractActorTest { -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertTrue; + @Test + public void testWriteTransactionWithSingleShard() throws Exception{ + System.setProperty("shard.persistent", "true"); + new IntegrationTestKit(getSystem()) {{ + DistributedDataStore dataStore = + setupDistributedDataStore("transactionIntegrationTest", "test-1"); -public class DistributedDataStoreIntegrationTest{ + testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - private static ActorSystem system; + testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - @Before - public void setUp() { - System.setProperty("shard.persistent", "false"); - system = ActorSystem.create("test"); + cleanup(dataStore); + }}; } - @After - public void tearDown() { - JavaTestKit.shutdownActorSystem(system); - system = null; - } + @Test + public void testWriteTransactionWithMultipleShards() throws Exception{ + System.setProperty("shard.persistent", "true"); + new IntegrationTestKit(getSystem()) {{ + DistributedDataStore dataStore = + setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1"); + + DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); + + YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH; + NormalizedNode nodeToWrite1 = CarsModel.emptyContainer(); + writeTx.write(nodePath1, nodeToWrite1); + + YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH; + NormalizedNode nodeToWrite2 = PeopleModel.emptyContainer(); + writeTx.write(nodePath2, nodeToWrite2); + + DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + + Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); + assertEquals("canCommit", true, canCommit); + cohort.preCommit().get(5, TimeUnit.SECONDS); + cohort.commit().get(5, TimeUnit.SECONDS); - protected ActorSystem getSystem() { - return system; + // 5. Verify the data in the store + + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + Optional> optional = readTx.read(nodePath1).get(); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", nodeToWrite1, optional.get()); + + optional = readTx.read(nodePath2).get(); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", nodeToWrite2, optional.get()); + + cleanup(dataStore); + }}; } @Test - public void integrationTest() throws Exception { - ShardStrategyFactory.setConfiguration(new MockConfiguration()); - DistributedDataStore distributedDataStore = - new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), new MockConfiguration()); + public void testReadWriteTransaction() throws Exception{ + System.setProperty("shard.persistent", "true"); + new IntegrationTestKit(getSystem()) {{ + DistributedDataStore dataStore = + setupDistributedDataStore("testReadWriteTransaction", "test-1"); - distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext()); + // 1. Create a read-write Tx - DOMStoreReadWriteTransaction transaction = - distributedDataStore.newReadWriteTransaction(); + DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); - transaction.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + // 2. Write some data - ListenableFuture>> future = - transaction.read(TestModel.TEST_PATH); + YangInstanceIdentifier nodePath = TestModel.TEST_PATH; + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + readWriteTx.write(nodePath, nodeToWrite ); - Optional> optional = future.get(); + // 3. Read the data from Tx - NormalizedNode normalizedNode = optional.get(); + Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS); + assertEquals("exists", true, exists); - assertEquals(TestModel.TEST_QNAME, normalizedNode.getNodeType()); + Optional> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", nodeToWrite, optional.get()); - DOMStoreThreePhaseCommitCohort ready = transaction.ready(); + // 4. Ready the Tx for commit - ListenableFuture canCommit = ready.canCommit(); + DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready(); - assertTrue(canCommit.get()); + // 5. Commit the Tx - ListenableFuture preCommit = ready.preCommit(); + Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); + assertEquals("canCommit", true, canCommit); + cohort.preCommit().get(5, TimeUnit.SECONDS); + cohort.commit().get(5, TimeUnit.SECONDS); - preCommit.get(); + // 6. Verify the data in the store - ListenableFuture commit = ready.commit(); + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - commit.get(); + optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", nodeToWrite, optional.get()); + cleanup(dataStore); + }}; } + @Test + public void testTransactionAbort() throws Exception{ + System.setProperty("shard.persistent", "true"); + new IntegrationTestKit(getSystem()) {{ + DistributedDataStore dataStore = + setupDistributedDataStore("transactionAbortIntegrationTest", "test-1"); + + DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); + + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + + cohort.canCommit().get(5, TimeUnit.SECONDS); + + cohort.abort().get(5, TimeUnit.SECONDS); + + testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + cleanup(dataStore); + }}; + } @Test - public void integrationTestWithMultiShardConfiguration() - throws ExecutionException, InterruptedException { - Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf"); + public void testTransactionChain() throws Exception{ + System.setProperty("shard.persistent", "true"); + new IntegrationTestKit(getSystem()) {{ + DistributedDataStore dataStore = + setupDistributedDataStore("transactionChainIntegrationTest", "test-1"); + + // 1. Create a Tx chain and write-only Tx + + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); + + // 2. Write some data + + NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + writeTx.write(TestModel.TEST_PATH, containerNode); + + // 3. Ready the Tx for commit + + DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + + // 4. Commit the Tx + + Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); + assertEquals("canCommit", true, canCommit); + cohort.preCommit().get(5, TimeUnit.SECONDS); + cohort.commit().get(5, TimeUnit.SECONDS); + + // 5. Verify the data in the store + + DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction(); + + Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", containerNode, optional.get()); + + txChain.close(); + + cleanup(dataStore); + }}; + } + + @Test + public void testChangeListenerRegistration() throws Exception{ + new IntegrationTestKit(getSystem()) {{ + DistributedDataStore dataStore = + setupDistributedDataStore("testChangeListenerRegistration", "test-1"); + + MockDataChangeListener listener = new MockDataChangeListener(3); + + ListenerRegistration + listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener, + DataChangeScope.SUBTREE); + + assertNotNull("registerChangeListener returned null", listenerReg); + + testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + + YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH). + nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); + testWriteTransaction(dataStore, listPath, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); + + listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath ); + + listenerReg.close(); + + testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH). + nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)); + + listener.expectNoMoreChanges("Received unexpected change after close"); + + cleanup(dataStore); + }}; + } + + class IntegrationTestKit extends ShardTestKit { + + IntegrationTestKit(ActorSystem actorSystem) { + super(actorSystem); + } + + DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) { + MockClusterWrapper cluster = new MockClusterWrapper(); + Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf"); + ShardStrategyFactory.setConfiguration(config); + + DatastoreContext datastoreContext = DatastoreContext.newBuilder().build(); + DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster, + config, datastoreContext); + + SchemaContext schemaContext = SchemaContextHelper.full(); + dataStore.onGlobalContextUpdated(schemaContext); + + for(String shardName: shardNames) { + ActorRef shard = null; + for(int i = 0; i < 20 * 5 && shard == null; i++) { + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + Optional shardReply = dataStore.getActorContext().findLocalShard(shardName); + if(shardReply.isPresent()) { + shard = shardReply.get(); + } + } + + assertNotNull("Shard was not created", shard); + + System.out.println("!!!!!!shard: "+shard.path().toString()); + waitUntilLeader(shard); + } + + return dataStore; + } - ShardStrategyFactory.setConfiguration(configuration); - DistributedDataStore distributedDataStore = - new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration); + void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath, + NormalizedNode nodeToWrite) throws Exception { + // 1. Create a write-only Tx - distributedDataStore.onGlobalContextUpdated(SchemaContextHelper.full()); + DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); - DOMStoreReadWriteTransaction transaction = - distributedDataStore.newReadWriteTransaction(); + // 2. Write some data - transaction.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - transaction.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + writeTx.write(nodePath, nodeToWrite); - DOMStoreThreePhaseCommitCohort ready = transaction.ready(); + // 3. Ready the Tx for commit - ListenableFuture canCommit = ready.canCommit(); + DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); - assertTrue(canCommit.get()); + // 4. Commit the Tx - ListenableFuture preCommit = ready.preCommit(); + Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); + assertEquals("canCommit", true, canCommit); + cohort.preCommit().get(5, TimeUnit.SECONDS); + cohort.commit().get(5, TimeUnit.SECONDS); - preCommit.get(); + // 5. Verify the data in the store - ListenableFuture commit = ready.commit(); + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - commit.get(); + Optional> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", nodeToWrite, optional.get()); + } + void cleanup(DistributedDataStore dataStore) { + dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null); + } } }