X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreIntegrationTest.java;h=f3d93b896de082534f1ed4e8879117226b3952a6;hb=2f7c93174d7834a4c4aedacc9b88aa53a5a0422c;hp=cec7ce1e3fc250a84231f84e83d97996bbe2e55a;hpb=cd81eb73b7abf677571b2366425ccbc8d794f4b6;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index cec7ce1e3f..f3d93b896d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -1,24 +1,37 @@ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorRef; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import akka.actor.ActorSystem; -import akka.actor.PoisonPill; +import akka.actor.Address; +import akka.actor.AddressFromURIString; +import akka.cluster.Cluster; +import akka.testkit.JavaTestKit; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.ConfigFactory; +import java.io.IOException; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; -import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; -import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal; -import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; -import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; @@ -26,25 +39,38 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +public class DistributedDataStoreIntegrationTest { -public class DistributedDataStoreIntegrationTest extends AbstractActorTest { + private static ActorSystem system; private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100); + @BeforeClass + public static void setUpClass() throws IOException { + system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); + Cluster.get(system).join(member1Address); + } + + @AfterClass + public static void tearDownClass() throws IOException { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + protected ActorSystem getSystem() { + return system; + } + @Test public void testWriteTransactionWithSingleShard() throws Exception{ - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", "test-1"); @@ -60,50 +86,59 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testWriteTransactionWithMultipleShards() throws Exception{ - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1"); DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); assertNotNull("newWriteOnlyTransaction returned null", writeTx); - YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH; - NormalizedNode nodeToWrite1 = CarsModel.emptyContainer(); - writeTx.write(nodePath1, nodeToWrite1); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH; - NormalizedNode nodeToWrite2 = PeopleModel.emptyContainer(); - writeTx.write(nodePath2, nodeToWrite2); + doCommit(writeTx.ready()); - DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + writeTx = dataStore.newWriteOnlyTransaction(); + + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + + doCommit(writeTx.ready()); - Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort.preCommit().get(5, TimeUnit.SECONDS); - cohort.commit().get(5, TimeUnit.SECONDS); + writeTx = dataStore.newWriteOnlyTransaction(); + + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + writeTx.write(carPath, car); + + MapEntryNode person = PeopleModel.newPersonEntry("jack"); + YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + writeTx.write(personPath, person); + + doCommit(writeTx.ready()); // Verify the data in the store DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - Optional> optional = readTx.read(nodePath1).get(); + Optional> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", nodeToWrite1, optional.get()); + assertEquals("Data node", car, optional.get()); - optional = readTx.read(nodePath2).get(); + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", nodeToWrite2, optional.get()); + assertEquals("Data node", person, optional.get()); cleanup(dataStore); }}; } @Test - public void testReadWriteTransaction() throws Exception{ + public void testReadWriteTransactionWithSingleShard() throws Exception{ System.setProperty("shard.persistent", "true"); - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = - setupDistributedDataStore("testReadWriteTransaction", "test-1"); + setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1"); // 1. Create a read-write Tx @@ -131,10 +166,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // 5. Commit the Tx - Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort.preCommit().get(5, TimeUnit.SECONDS); - cohort.commit().get(5, TimeUnit.SECONDS); + doCommit(cohort); // 6. Verify the data in the store @@ -149,9 +181,64 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { } @Test - public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{ - new IntegrationTestKit(getSystem()) {{ - String testName = "testTransactionWritesWithShardNotInitiallyReady"; + public void testReadWriteTransactionWithMultipleShards() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = + setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1"); + + DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); + + readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + + doCommit(readWriteTx.ready()); + + readWriteTx = dataStore.newReadWriteTransaction(); + + readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + + doCommit(readWriteTx.ready()); + + readWriteTx = dataStore.newReadWriteTransaction(); + + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + readWriteTx.write(carPath, car); + + MapEntryNode person = PeopleModel.newPersonEntry("jack"); + YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + readWriteTx.write(personPath, person); + + Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS); + assertEquals("exists", true, exists); + + Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); + + doCommit(readWriteTx.ready()); + + // Verify the data in the store + + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); + + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); + + cleanup(dataStore); + }}; + } + + private void testTransactionWritesWithShardNotInitiallyReady(final String testName, + final boolean writeOnly) throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ String shardName = "test-1"; // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't @@ -164,7 +251,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // Create the write Tx - final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : + dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", writeTx); // Do some modification operations and ready the Tx on a separate thread. @@ -219,9 +307,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // Wait for the Tx commit to complete. - assertEquals("canCommit", true, txCohort.get().canCommit().get(5, TimeUnit.SECONDS)); - txCohort.get().preCommit().get(5, TimeUnit.SECONDS); - txCohort.get().commit().get(5, TimeUnit.SECONDS); + doCommit(txCohort.get()); // Verify the data in the store @@ -242,8 +328,19 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { } @Test - public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{ - new IntegrationTestKit(getSystem()) {{ + public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception { + datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true); + } + + @Test + public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception { + testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false); + } + + @Test + public void testTransactionReadsWithShardNotInitiallyReady() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ String testName = "testTransactionReadsWithShardNotInitiallyReady"; String shardName = "test-1"; @@ -316,7 +413,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test(expected=NotInitializedException.class) public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{ - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ String testName = "testTransactionCommitFailureWithShardNotInitialized"; String shardName = "test-1"; @@ -386,7 +483,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test(expected=NotInitializedException.class) public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{ - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ String testName = "testTransactionReadFailureWithShardNotInitialized"; String shardName = "test-1"; @@ -457,16 +554,16 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } - @Test(expected=NoShardLeaderException.class) - public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{ - new IntegrationTestKit(getSystem()) {{ + private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ String testName = "testTransactionCommitFailureWithNoShardLeader"; - String shardName = "test-1"; + String shardName = "default"; // We don't want the shard to become the leader so prevent shard election from completing // by setting the election timeout, which is based on the heartbeat interval, really high. datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000); + datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); // Set the leader election timeout low for the test. @@ -476,7 +573,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // Create the write Tx. - final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : + dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", writeTx); // Do some modifications and ready the Tx on a separate thread. @@ -488,8 +586,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Override public void run() { try { - writeTx.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + writeTx.write(TestModel.JUNK_PATH, + ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); txCohort.set(writeTx.ready()); } catch(Exception e) { @@ -525,10 +623,21 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } + @Test(expected=NoShardLeaderException.class) + public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable { + datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + testTransactionCommitFailureWithNoShardLeader(true); + } + + @Test(expected=NoShardLeaderException.class) + public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable { + testTransactionCommitFailureWithNoShardLeader(false); + } + @Test public void testTransactionAbort() throws Exception{ System.setProperty("shard.persistent", "true"); - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore("transactionAbortIntegrationTest", "test-1"); @@ -551,11 +660,9 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { } @Test - public void testTransactionChain() throws Exception{ - System.setProperty("shard.persistent", "true"); - new IntegrationTestKit(getSystem()) {{ - DistributedDataStore dataStore = - setupDistributedDataStore("transactionChainIntegrationTest", "test-1"); + public void testTransactionChainWithSingleShard() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1"); // 1. Create a Tx chain and write-only Tx @@ -566,151 +673,281 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // 2. Write some data - NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - writeTx.write(TestModel.TEST_PATH, containerNode); + NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + writeTx.write(TestModel.TEST_PATH, testNode); // 3. Ready the Tx for commit - DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - // 4. Commit the Tx + // 4. Commit the Tx on another thread that first waits for the second read Tx. - Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort.preCommit().get(5, TimeUnit.SECONDS); - cohort.commit().get(5, TimeUnit.SECONDS); + final CountDownLatch continueCommit1 = new CountDownLatch(1); + final CountDownLatch commit1Done = new CountDownLatch(1); + final AtomicReference commit1Error = new AtomicReference<>(); + new Thread() { + @Override + public void run() { + try { + continueCommit1.await(); + doCommit(cohort1); + } catch (Exception e) { + commit1Error.set(e); + } finally { + commit1Done.countDown(); + } + } + }.start(); - // 5. Verify the data in the store + // 5. Create a new read Tx from the chain to read and verify the data from the first + // Tx is visible after being readied. DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction(); - Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", containerNode, optional.get()); + assertEquals("Data node", testNode, optional.get()); + + // 6. Create a new RW Tx from the chain, write more data, and ready it + + DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(); + rwTx.write(TestModel.OUTER_LIST_PATH, outerNode); + + DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready(); + + // 7. Create a new read Tx from the chain to read the data from the last RW Tx to + // verify it is visible. + + readTx = txChain.newReadWriteTransaction(); + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", outerNode, optional.get()); + + // 8. Wait for the 2 commits to complete and close the chain. + + continueCommit1.countDown(); + Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS); + + if(commit1Error.get() != null) { + throw commit1Error.get(); + } + + doCommit(cohort2); txChain.close(); + // 9. Create a new read Tx from the data store and verify committed data. + + readTx = dataStore.newReadOnlyTransaction(); + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", outerNode, optional.get()); + cleanup(dataStore); }}; } @Test - public void testChangeListenerRegistration() throws Exception{ - new IntegrationTestKit(getSystem()) {{ - DistributedDataStore dataStore = - setupDistributedDataStore("testChangeListenerRegistration", "test-1"); + public void testTransactionChainWithMultipleShards() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards", + "cars-1", "people-1"); - MockDataChangeListener listener = new MockDataChangeListener(3); + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - ListenerRegistration - listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener, - DataChangeScope.SUBTREE); + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); - assertNotNull("registerChangeListener returned null", listenerReg); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - testWriteTransaction(dataStore, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); - testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH). - nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); - testWriteTransaction(dataStore, listPath, - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); + DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction(); - listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath ); + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + readWriteTx.write(carPath, car); - listenerReg.close(); + MapEntryNode person = PeopleModel.newPersonEntry("jack"); + YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + readWriteTx.merge(personPath, person); - testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH). - nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)); + Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); - listener.expectNoMoreChanges("Received unexpected change after close"); + optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); + + DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); + + writeTx = txChain.newWriteOnlyTransaction(); + + //writeTx.delete(personPath); + + DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready(); + + doCommit(cohort1); + doCommit(cohort2); + doCommit(cohort3); + + txChain.close(); + + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); + + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + //assertEquals("isPresent", false, optional.isPresent()); + assertEquals("isPresent", true, optional.isPresent()); cleanup(dataStore); }}; } - class IntegrationTestKit extends ShardTestKit { - - IntegrationTestKit(ActorSystem actorSystem) { - super(actorSystem); - } - - DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) { - return setupDistributedDataStore(typeName, true, shardNames); - } - - DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader, - String... shardNames) { - MockClusterWrapper cluster = new MockClusterWrapper(); - Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf"); - ShardStrategyFactory.setConfiguration(config); - - DatastoreContext datastoreContext = datastoreContextBuilder.build(); - DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster, - config, datastoreContext); - - SchemaContext schemaContext = SchemaContextHelper.full(); - dataStore.onGlobalContextUpdated(schemaContext); - - if(waitUntilLeader) { - for(String shardName: shardNames) { - ActorRef shard = null; - for(int i = 0; i < 20 * 5 && shard == null; i++) { - Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); - Optional shardReply = dataStore.getActorContext().findLocalShard(shardName); - if(shardReply.isPresent()) { - shard = shardReply.get(); - } - } + @Test + public void testCreateChainedTransactionsInQuickSuccession() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionsInQuickSuccession", "test-1"); + + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - assertNotNull("Shard was not created", shard); + NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + int nTxs = 20; + List cohorts = new ArrayList<>(nTxs); + for(int i = 0; i < nTxs; i++) { + DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + + rwTx.merge(TestModel.TEST_PATH, testNode); + + cohorts.add(rwTx.ready()); - waitUntilLeader(shard); - } } - return dataStore; - } + for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { + doCommit(cohort); + } - void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath, - NormalizedNode nodeToWrite) throws Exception { + txChain.close(); - // 1. Create a write-only Tx + cleanup(dataStore); + }}; + } - DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); + @Test + public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionAfterEmptyTxReadied", "test-1"); - // 2. Write some data + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - writeTx.write(nodePath, nodeToWrite); + DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction(); - // 3. Ready the Tx for commit + rwTx1.ready(); - DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction(); - // 4. Commit the Tx + Optional> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", false, optional.isPresent()); - Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort.preCommit().get(5, TimeUnit.SECONDS); - cohort.commit().get(5, TimeUnit.SECONDS); + txChain.close(); - // 5. Verify the data in the store + cleanup(dataStore); + }}; + } - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + @Test + public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionWhenPreviousNotReady", "test-1"); - Optional> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", nodeToWrite, optional.get()); - } + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - void cleanup(DistributedDataStore dataStore) { - dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null); - } + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); + + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't + // readied. + + assertExceptionOnTxChainCreates(txChain, IllegalStateException.class); + }}; + } + + @Test + public void testCreateChainedTransactionAfterClose() throws Throwable { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionAfterClose", "test-1"); + + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + txChain.close(); + + // Try to create another Tx of each type - should fail b/c the previous Tx was closed. + + assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class); + }}; } + @Test + public void testChangeListenerRegistration() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = + setupDistributedDataStore("testChangeListenerRegistration", "test-1"); + + testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + MockDataChangeListener listener = new MockDataChangeListener(1); + + ListenerRegistration + listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener, + DataChangeScope.SUBTREE); + + assertNotNull("registerChangeListener returned null", listenerReg); + + // Wait for the initial notification + + listener.waitForChangeEvents(TestModel.TEST_PATH); + + listener.reset(2); + + // Write 2 updates. + + testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + + YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH). + nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); + testWriteTransaction(dataStore, listPath, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); + + // Wait for the 2 updates. + + listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); + + listenerReg.close(); + + testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH). + nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)); + + listener.expectNoMoreChanges("Received unexpected change after close"); + + cleanup(dataStore); + }}; + } }