From: Tomas Cere Date: Wed, 13 Mar 2019 11:50:55 +0000 (+0100) Subject: Add integration test for segmented journal X-Git-Tag: release/sodium~133 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=628e4c6fa9bd55ea49eda4cd8e0a5cfcfdfa7dc5 Add integration test for segmented journal Extracts test cases for DistributedDataStore that do not need latching in InMemoryJournal/Snapshot so we can have these run with segmented journal. Change-Id: I0ea4e58bff0f0e8e1a40bd4f26b827623139d6c9 Signed-off-by: Tomas Cere --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/pom.xml b/opendaylight/md-sal/sal-distributed-datastore/pom.xml index 2753ac2071..19bd47553a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/pom.xml +++ b/opendaylight/md-sal/sal-distributed-datastore/pom.xml @@ -141,6 +141,11 @@ org.opendaylight.controller sal-core-compat + + org.opendaylight.controller + sal-akka-segmented-journal + test + org.opendaylight.mdsal @@ -208,6 +213,23 @@ mdsal-dom-inmemory-datastore + + io.atomix + atomix-storage + 3.1.5 + test + + + io.atomix + atomix-utils + 3.1.5 + test + + + commons-io + commons-io + test + commons-lang commons-lang diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java new file mode 100644 index 0000000000..62986b2ebf --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java @@ -0,0 +1,874 @@ +/* + * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.runners.Parameterized.Parameter; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +import akka.actor.ActorSystem; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.Uninterruptibles; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Test; +import org.mockito.Mockito; +import org.opendaylight.controller.cluster.access.client.RequestTimeoutException; +import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; +import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; +import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; +import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; +import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; +import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMTransactionChain; +import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException; +import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener; +import org.opendaylight.mdsal.dom.spi.store.DOMStore; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; + +public abstract class AbstractDistributedDataStoreIntegrationTest { + + @Parameter + public Class testParameter; + + protected ActorSystem system; + + protected final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder() + .shardHeartbeatIntervalInMillis(100); + + protected ActorSystem getSystem() { + return system; + } + + @Test + public void testWriteTransactionWithSingleShard() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "transactionIntegrationTest", "test-1")) { + + testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) + .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42)) + .build()); + } + } + + @Test + public void testWriteTransactionWithMultipleShards() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) { + + DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); + + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + + testKit.doCommit(writeTx.ready()); + + writeTx = dataStore.newWriteOnlyTransaction(); + + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + + testKit.doCommit(writeTx.ready()); + + writeTx = dataStore.newWriteOnlyTransaction(); + + final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + writeTx.write(carPath, car); + + final MapEntryNode person = PeopleModel.newPersonEntry("jack"); + final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + writeTx.write(personPath, person); + + testKit.doCommit(writeTx.ready()); + + // Verify the data in the store + final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + Optional> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("Data node", car, optional.get()); + + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("Data node", person, optional.get()); + } + } + + @Test + public void testReadWriteTransactionWithSingleShard() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) { + + // 1. Create a read-write Tx + final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); + + // 2. Write some data + final YangInstanceIdentifier nodePath = TestModel.TEST_PATH; + final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + readWriteTx.write(nodePath, nodeToWrite); + + // 3. Read the data from Tx + final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS); + assertEquals("exists", Boolean.TRUE, exists); + + Optional> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("Data node", nodeToWrite, optional.get()); + + // 4. Ready the Tx for commit + final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready(); + + // 5. Commit the Tx + testKit.doCommit(cohort); + + // 6. Verify the data in the store + final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("Data node", nodeToWrite, optional.get()); + } + } + + @Test + public void testReadWriteTransactionWithMultipleShards() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) { + + DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); + + readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + + testKit.doCommit(readWriteTx.ready()); + + readWriteTx = dataStore.newReadWriteTransaction(); + + readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + + testKit.doCommit(readWriteTx.ready()); + + readWriteTx = dataStore.newReadWriteTransaction(); + + final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + readWriteTx.write(carPath, car); + + final MapEntryNode person = PeopleModel.newPersonEntry("jack"); + final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + readWriteTx.write(personPath, person); + + final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS); + assertEquals("exists", Boolean.TRUE, exists); + + Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("Data node", car, optional.get()); + + testKit.doCommit(readWriteTx.ready()); + + // Verify the data in the store + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("Data node", car, optional.get()); + + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("Data node", person, optional.get()); + } + } + + @Test + public void testSingleTransactionsWritesInQuickSuccession() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) { + + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + testKit.doCommit(writeTx.ready()); + + writeTx = txChain.newWriteOnlyTransaction(); + + int numCars = 5; + for (int i = 0; i < numCars; i++) { + writeTx.write(CarsModel.newCarPath("car" + i), + CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); + } + + testKit.doCommit(writeTx.ready()); + + final Optional> optional = txChain.newReadOnlyTransaction() + .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + } + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName) + throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + final String shardName = "default"; + + // We don't want the shard to become the leader so prevent shard + // elections. + datastoreContextBuilder.customRaftPolicyImplementation( + "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy"); + + // The ShardManager uses the election timeout for FindPrimary so + // reset it low so it will timeout quickly. + datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1) + .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2); + + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName)) { + + final Object result = dataStore.getActorUtils().executeOperation( + dataStore.getActorUtils().getShardManager(), new FindLocalShard(shardName, true)); + assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound); + + // Create the write Tx. + DOMStoreWriteTransaction writeTxToClose = null; + try { + writeTxToClose = writeOnly ? dataStore.newWriteOnlyTransaction() + : dataStore.newReadWriteTransaction(); + final DOMStoreWriteTransaction writeTx = writeTxToClose; + assertNotNull("newReadWriteTransaction returned null", writeTx); + + // Do some modifications and ready the Tx on a separate + // thread. + final AtomicReference txCohort = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReady = new CountDownLatch(1); + final Thread txThread = new Thread(() -> { + try { + writeTx.write(TestModel.JUNK_PATH, + ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); + + txCohort.set(writeTx.ready()); + } catch (Exception e) { + caughtEx.set(e); + } finally { + txReady.countDown(); + } + }); + + txThread.start(); + + // Wait for the Tx operations to complete. + boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); + if (caughtEx.get() != null) { + throw caughtEx.get(); + } + + assertTrue("Tx ready", done); + + // Wait for the commit to complete. Since no shard + // leader was elected in time, the Tx + // should have timed out and throw an appropriate + // exception cause. + try { + txCohort.get().canCommit().get(10, TimeUnit.SECONDS); + fail("Expected NoShardLeaderException"); + } catch (final ExecutionException e) { + final String msg = "Unexpected exception: " + + Throwables.getStackTraceAsString(e.getCause()); + if (DistributedDataStore.class.equals(testParameter)) { + assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException); + } else { + assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); + } + } + } finally { + try { + if (writeTxToClose != null) { + writeTxToClose.close(); + } + } catch (Exception e) { + // FIXME TransactionProxy.close throws IllegalStateException: + // Transaction is ready, it cannot be closed + } + } + } + } + + @Test + public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception { + datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader"); + } + + @Test + public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception { + testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader"); + } + + @Test + public void testTransactionAbort() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "transactionAbortIntegrationTest", "test-1")) { + + final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); + + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + final DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + + cohort.canCommit().get(5, TimeUnit.SECONDS); + + cohort.abort().get(5, TimeUnit.SECONDS); + + testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + } + } + + @Test + @SuppressWarnings("checkstyle:IllegalCatch") + public void testTransactionChainWithSingleShard() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testTransactionChainWithSingleShard", "test-1")) { + + // 1. Create a Tx chain and write-only Tx + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); + + // 2. Write some data + final NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + writeTx.write(TestModel.TEST_PATH, testNode); + + // 3. Ready the Tx for commit + final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + + // 4. Commit the Tx on another thread that first waits for + // the second read Tx. + final CountDownLatch continueCommit1 = new CountDownLatch(1); + final CountDownLatch commit1Done = new CountDownLatch(1); + final AtomicReference commit1Error = new AtomicReference<>(); + new Thread(() -> { + try { + continueCommit1.await(); + testKit.doCommit(cohort1); + } catch (Exception e) { + commit1Error.set(e); + } finally { + commit1Done.countDown(); + } + }).start(); + + // 5. Create a new read Tx from the chain to read and verify + // the data from the first + // Tx is visible after being readied. + DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction(); + Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("Data node", testNode, optional.get()); + + // 6. Create a new RW Tx from the chain, write more data, + // and ready it + final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + final MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) + .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42)) + .build(); + rwTx.write(TestModel.OUTER_LIST_PATH, outerNode); + + final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready(); + + // 7. Create a new read Tx from the chain to read the data + // from the last RW Tx to + // verify it is visible. + readTx = txChain.newReadWriteTransaction(); + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("Data node", outerNode, optional.get()); + + // 8. Wait for the 2 commits to complete and close the + // chain. + continueCommit1.countDown(); + Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS); + + if (commit1Error.get() != null) { + throw commit1Error.get(); + } + + testKit.doCommit(cohort2); + + txChain.close(); + + // 9. Create a new read Tx from the data store and verify + // committed data. + readTx = dataStore.newReadOnlyTransaction(); + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("Data node", outerNode, optional.get()); + } + } + + @Test + public void testTransactionChainWithMultipleShards() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) { + + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); + + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + + final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + + final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction(); + + final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + readWriteTx.write(carPath, car); + + final MapEntryNode person = PeopleModel.newPersonEntry("jack"); + final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + readWriteTx.merge(personPath, person); + + Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("Data node", car, optional.get()); + + optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("Data node", person, optional.get()); + + final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); + + writeTx = txChain.newWriteOnlyTransaction(); + + writeTx.delete(carPath); + + final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready(); + + final ListenableFuture canCommit1 = cohort1.canCommit(); + final ListenableFuture canCommit2 = cohort2.canCommit(); + + testKit.doCommit(canCommit1, cohort1); + testKit.doCommit(canCommit2, cohort2); + testKit.doCommit(cohort3); + + txChain.close(); + + final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + assertFalse("isPresent", optional.isPresent()); + + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("Data node", person, optional.get()); + } + } + + @Test + public void testCreateChainedTransactionsInQuickSuccession() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) { + + final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ImmutableMap.builder() + .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), + MoreExecutors.directExecutor()); + + final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); + DOMTransactionChain txChain = broker.createTransactionChain(listener); + + final List> futures = new ArrayList<>(); + + final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + futures.add(writeTx.commit()); + + int numCars = 100; + for (int i = 0; i < numCars; i++) { + final DOMDataTreeReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + + rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i), + CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); + + futures.add(rwTx.commit()); + } + + for (final ListenableFuture f : futures) { + f.get(5, TimeUnit.SECONDS); + } + + final Optional> optional = txChain.newReadOnlyTransaction() + .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + + txChain.close(); + + broker.close(); + } + } + + @Test + public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) { + + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + final DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction(); + + rwTx1.ready(); + + final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction(); + + final Optional> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + assertFalse("isPresent", optional.isPresent()); + + txChain.close(); + } + } + + @Test + public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) { + + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); + + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + // Try to create another Tx of each type - each should fail + // b/c the previous Tx wasn't + // readied. + testKit.assertExceptionOnTxChainCreates(txChain, IllegalStateException.class); + } + } + + @Test + public void testCreateChainedTransactionAfterClose() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testCreateChainedTransactionAfterClose", "test-1")) { + + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + txChain.close(); + + // Try to create another Tx of each type - should fail b/c + // the previous Tx was closed. + testKit.assertExceptionOnTxChainCreates(txChain, DOMTransactionChainClosedException.class); + } + } + + @Test + public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) { + + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + // Create a write tx and submit. + final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + + // Create read-only tx's and issue a read. + FluentFuture>> readFuture1 = txChain + .newReadOnlyTransaction().read(TestModel.TEST_PATH); + + FluentFuture>> readFuture2 = txChain + .newReadOnlyTransaction().read(TestModel.TEST_PATH); + + // Create another write tx and issue the write. + DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction(); + writeTx2.write(TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) + .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42)) + .build()); + + // Ensure the reads succeed. + + assertTrue("isPresent", readFuture1.get(5, TimeUnit.SECONDS).isPresent()); + assertTrue("isPresent", readFuture2.get(5, TimeUnit.SECONDS).isPresent()); + + // Ensure the writes succeed. + DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready(); + + testKit.doCommit(cohort1); + testKit.doCommit(cohort2); + + assertTrue("isPresent", txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH) + .get(5, TimeUnit.SECONDS).isPresent()); + } + } + + @Test + public void testChainedTransactionFailureWithSingleShard() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) { + + final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ImmutableMap.builder() + .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), + MoreExecutors.directExecutor()); + + final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); + final DOMTransactionChain txChain = broker.createTransactionChain(listener); + + final DOMDataTreeReadWriteTransaction writeTx = txChain.newReadWriteTransaction(); + + writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, + PeopleModel.emptyContainer()); + + final ContainerNode invalidData = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) + .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); + + writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + + try { + writeTx.commit().get(5, TimeUnit.SECONDS); + fail("Expected TransactionCommitFailedException"); + } catch (final ExecutionException e) { + // Expected + } + + verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), + any(Throwable.class)); + + txChain.close(); + broker.close(); + } + } + + @Test + public void testChainedTransactionFailureWithMultipleShards() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) { + + final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ImmutableMap.builder() + .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), + MoreExecutors.directExecutor()); + + final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); + final DOMTransactionChain txChain = broker.createTransactionChain(listener); + + final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction(); + + writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, + PeopleModel.emptyContainer()); + + final ContainerNode invalidData = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) + .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); + + writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + + // Note that merge will validate the data and fail but put + // succeeds b/c deep validation is not + // done for put for performance reasons. + try { + writeTx.commit().get(5, TimeUnit.SECONDS); + fail("Expected TransactionCommitFailedException"); + } catch (final ExecutionException e) { + // Expected + } + + verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), + any(Throwable.class)); + + txChain.close(); + broker.close(); + } + } + + @Test + public void testDataTreeChangeListenerRegistration() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testDataTreeChangeListenerRegistration", "test-1")) { + + testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); + + ListenerRegistration listenerReg = dataStore + .registerTreeChangeListener(TestModel.TEST_PATH, listener); + + assertNotNull("registerTreeChangeListener returned null", listenerReg); + + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("getTreeChangeListenerActors", 1, + state.getTreeChangeListenerActors().size())); + + // Wait for the initial notification + listener.waitForChangeEvents(TestModel.TEST_PATH); + listener.reset(2); + + // Write 2 updates. + testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) + .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42)) + .build()); + + YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); + testKit.testWriteTransaction(dataStore, listPath, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); + + // Wait for the 2 updates. + listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); + listenerReg.close(); + + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("getTreeChangeListenerActors", 0, + state.getTreeChangeListenerActors().size())); + + testKit.testWriteTransaction(dataStore, + YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)); + + listener.expectNoMoreChanges("Received unexpected change after close"); + } + } + + @Test + public void testRestoreFromDatastoreSnapshot() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + final String name = "transactionIntegrationTest"; + + final ContainerNode carsNode = CarsModel.newCarsNode( + CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)), + CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L)))); + + DataTree dataTree = new InMemoryDataTreeFactory().create( + DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full()); + AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode); + NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); + + final Snapshot carsSnapshot = Snapshot.create( + new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), + Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); + + dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL, + SchemaContextHelper.full()); + + final NormalizedNode peopleNode = PeopleModel.create(); + AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode); + + root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); + + final Snapshot peopleSnapshot = Snapshot.create( + new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), + Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); + + testKit.restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList( + new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot), + new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot))); + + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, name, "module-shards-member1.conf", true, "cars", "people")) { + + final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + // two reads + Optional> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("Data node", carsNode, optional.get()); + + optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("Data node", peopleNode, optional.get()); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index a4d6d2caaf..d984535db8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -12,10 +12,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; import akka.actor.ActorSystem; import akka.actor.Address; @@ -23,18 +19,11 @@ import akka.actor.AddressFromURIString; import akka.cluster.Cluster; import akka.testkit.javadsl.TestKit; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.FluentFuture; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; -import java.math.BigInteger; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.List; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -45,54 +34,23 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; -import org.mockito.Mockito; -import org.opendaylight.controller.cluster.access.client.RequestTimeoutException; import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore; -import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; -import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; -import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; -import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; -import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; -import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; -import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; -import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; -import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; -import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; -import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; -import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.common.api.ReadFailedException; -import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction; -import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; -import org.opendaylight.mdsal.dom.api.DOMTransactionChain; -import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException; -import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener; -import org.opendaylight.mdsal.dom.spi.store.DOMStore; import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction; import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain; import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; -import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; @RunWith(Parameterized.class) -public class DistributedDataStoreIntegrationTest { +public class DistributedDataStoreIntegrationTest extends AbstractDistributedDataStoreIntegrationTest { @Parameters(name = "{0}") public static Collection data() { @@ -101,14 +59,6 @@ public class DistributedDataStoreIntegrationTest { }); } - @Parameter - public Class testParameter; - - private ActorSystem system; - - private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder() - .shardHeartbeatIntervalInMillis(100); - @Before public void setUp() { InMemorySnapshotStore.clear(); @@ -124,193 +74,6 @@ public class DistributedDataStoreIntegrationTest { system = null; } - protected ActorSystem getSystem() { - return system; - } - - @Test - public void testWriteTransactionWithSingleShard() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "transactionIntegrationTest", "test-1")) { - - testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - - testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) - .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42)) - .build()); - } - } - - @Test - public void testWriteTransactionWithMultipleShards() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) { - - DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); - - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - - testKit.doCommit(writeTx.ready()); - - writeTx = dataStore.newWriteOnlyTransaction(); - - writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); - - testKit.doCommit(writeTx.ready()); - - writeTx = dataStore.newWriteOnlyTransaction(); - - final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); - writeTx.write(carPath, car); - - final MapEntryNode person = PeopleModel.newPersonEntry("jack"); - final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); - writeTx.write(personPath, person); - - testKit.doCommit(writeTx.ready()); - - // Verify the data in the store - final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - - Optional> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", car, optional.get()); - - optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", person, optional.get()); - } - } - - @Test - public void testReadWriteTransactionWithSingleShard() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) { - - // 1. Create a read-write Tx - final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", readWriteTx); - - // 2. Write some data - final YangInstanceIdentifier nodePath = TestModel.TEST_PATH; - final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - readWriteTx.write(nodePath, nodeToWrite); - - // 3. Read the data from Tx - final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS); - assertEquals("exists", Boolean.TRUE, exists); - - Optional> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", nodeToWrite, optional.get()); - - // 4. Ready the Tx for commit - final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready(); - - // 5. Commit the Tx - testKit.doCommit(cohort); - - // 6. Verify the data in the store - final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - - optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", nodeToWrite, optional.get()); - } - } - - @Test - public void testReadWriteTransactionWithMultipleShards() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) { - - DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", readWriteTx); - - readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - - testKit.doCommit(readWriteTx.ready()); - - readWriteTx = dataStore.newReadWriteTransaction(); - - readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); - - testKit.doCommit(readWriteTx.ready()); - - readWriteTx = dataStore.newReadWriteTransaction(); - - final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); - readWriteTx.write(carPath, car); - - final MapEntryNode person = PeopleModel.newPersonEntry("jack"); - final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); - readWriteTx.write(personPath, person); - - final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS); - assertEquals("exists", Boolean.TRUE, exists); - - Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", car, optional.get()); - - testKit.doCommit(readWriteTx.ready()); - - // Verify the data in the store - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - - optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", car, optional.get()); - - optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", person, optional.get()); - } - } - - @Test - public void testSingleTransactionsWritesInQuickSuccession() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) { - - final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - - DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - testKit.doCommit(writeTx.ready()); - - writeTx = txChain.newWriteOnlyTransaction(); - - int numCars = 5; - for (int i = 0; i < numCars; i++) { - writeTx.write(CarsModel.newCarPath("car" + i), - CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); - } - - testKit.doCommit(writeTx.ready()); - - final Optional> optional = txChain.newReadOnlyTransaction() - .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); - } - } - @SuppressWarnings("checkstyle:IllegalCatch") private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly) throws Exception { @@ -606,602 +369,4 @@ public class DistributedDataStoreIntegrationTest { } } - @SuppressWarnings("checkstyle:IllegalCatch") - private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName) - throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - final String shardName = "default"; - - // We don't want the shard to become the leader so prevent shard - // elections. - datastoreContextBuilder.customRaftPolicyImplementation( - "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy"); - - // The ShardManager uses the election timeout for FindPrimary so - // reset it low so it will timeout quickly. - datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1) - .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2); - - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName)) { - - final Object result = dataStore.getActorUtils().executeOperation( - dataStore.getActorUtils().getShardManager(), new FindLocalShard(shardName, true)); - assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound); - - // Create the write Tx. - DOMStoreWriteTransaction writeTxToClose = null; - try { - writeTxToClose = writeOnly ? dataStore.newWriteOnlyTransaction() - : dataStore.newReadWriteTransaction(); - final DOMStoreWriteTransaction writeTx = writeTxToClose; - assertNotNull("newReadWriteTransaction returned null", writeTx); - - // Do some modifications and ready the Tx on a separate - // thread. - final AtomicReference txCohort = new AtomicReference<>(); - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch txReady = new CountDownLatch(1); - final Thread txThread = new Thread(() -> { - try { - writeTx.write(TestModel.JUNK_PATH, - ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); - - txCohort.set(writeTx.ready()); - } catch (Exception e) { - caughtEx.set(e); - } finally { - txReady.countDown(); - } - }); - - txThread.start(); - - // Wait for the Tx operations to complete. - boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); - if (caughtEx.get() != null) { - throw caughtEx.get(); - } - - assertTrue("Tx ready", done); - - // Wait for the commit to complete. Since no shard - // leader was elected in time, the Tx - // should have timed out and throw an appropriate - // exception cause. - try { - txCohort.get().canCommit().get(10, TimeUnit.SECONDS); - fail("Expected NoShardLeaderException"); - } catch (final ExecutionException e) { - final String msg = "Unexpected exception: " - + Throwables.getStackTraceAsString(e.getCause()); - if (DistributedDataStore.class.equals(testParameter)) { - assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException); - } else { - assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); - } - } - } finally { - try { - if (writeTxToClose != null) { - writeTxToClose.close(); - } - } catch (Exception e) { - // FIXME TransactionProxy.close throws IllegalStateException: - // Transaction is ready, it cannot be closed - } - } - } - } - - @Test - public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception { - datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); - testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader"); - } - - @Test - public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception { - testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader"); - } - - @Test - public void testTransactionAbort() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "transactionAbortIntegrationTest", "test-1")) { - - final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); - - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - - final DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); - - cohort.canCommit().get(5, TimeUnit.SECONDS); - - cohort.abort().get(5, TimeUnit.SECONDS); - - testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - } - } - - @Test - @SuppressWarnings("checkstyle:IllegalCatch") - public void testTransactionChainWithSingleShard() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testTransactionChainWithSingleShard", "test-1")) { - - // 1. Create a Tx chain and write-only Tx - final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - - final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); - - // 2. Write some data - final NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - writeTx.write(TestModel.TEST_PATH, testNode); - - // 3. Ready the Tx for commit - final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - - // 4. Commit the Tx on another thread that first waits for - // the second read Tx. - final CountDownLatch continueCommit1 = new CountDownLatch(1); - final CountDownLatch commit1Done = new CountDownLatch(1); - final AtomicReference commit1Error = new AtomicReference<>(); - new Thread(() -> { - try { - continueCommit1.await(); - testKit.doCommit(cohort1); - } catch (Exception e) { - commit1Error.set(e); - } finally { - commit1Done.countDown(); - } - }).start(); - - // 5. Create a new read Tx from the chain to read and verify - // the data from the first - // Tx is visible after being readied. - DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction(); - Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", testNode, optional.get()); - - // 6. Create a new RW Tx from the chain, write more data, - // and ready it - final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); - final MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) - .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42)) - .build(); - rwTx.write(TestModel.OUTER_LIST_PATH, outerNode); - - final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready(); - - // 7. Create a new read Tx from the chain to read the data - // from the last RW Tx to - // verify it is visible. - readTx = txChain.newReadWriteTransaction(); - optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", outerNode, optional.get()); - - // 8. Wait for the 2 commits to complete and close the - // chain. - continueCommit1.countDown(); - Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS); - - if (commit1Error.get() != null) { - throw commit1Error.get(); - } - - testKit.doCommit(cohort2); - - txChain.close(); - - // 9. Create a new read Tx from the data store and verify - // committed data. - readTx = dataStore.newReadOnlyTransaction(); - optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", outerNode, optional.get()); - } - } - - @Test - public void testTransactionChainWithMultipleShards() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) { - - final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - - DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); - - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - - writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); - - final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - - final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction(); - - final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); - readWriteTx.write(carPath, car); - - final MapEntryNode person = PeopleModel.newPersonEntry("jack"); - final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); - readWriteTx.merge(personPath, person); - - Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", car, optional.get()); - - optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", person, optional.get()); - - final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); - - writeTx = txChain.newWriteOnlyTransaction(); - - writeTx.delete(carPath); - - final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready(); - - final ListenableFuture canCommit1 = cohort1.canCommit(); - final ListenableFuture canCommit2 = cohort2.canCommit(); - - testKit.doCommit(canCommit1, cohort1); - testKit.doCommit(canCommit2, cohort2); - testKit.doCommit(cohort3); - - txChain.close(); - - final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - - optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); - assertFalse("isPresent", optional.isPresent()); - - optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", person, optional.get()); - } - } - - @Test - public void testCreateChainedTransactionsInQuickSuccession() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) { - - final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( - ImmutableMap.builder() - .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), - MoreExecutors.directExecutor()); - - final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); - DOMTransactionChain txChain = broker.createTransactionChain(listener); - - final List> futures = new ArrayList<>(); - - final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - futures.add(writeTx.commit()); - - int numCars = 100; - for (int i = 0; i < numCars; i++) { - final DOMDataTreeReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); - - rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i), - CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); - - futures.add(rwTx.commit()); - } - - for (final ListenableFuture f : futures) { - f.get(5, TimeUnit.SECONDS); - } - - final Optional> optional = txChain.newReadOnlyTransaction() - .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); - - txChain.close(); - - broker.close(); - } - } - - @Test - public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) { - - final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - - final DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction(); - - rwTx1.ready(); - - final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction(); - - final Optional> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - assertFalse("isPresent", optional.isPresent()); - - txChain.close(); - } - } - - @Test - public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) { - - final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - - final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); - - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - - // Try to create another Tx of each type - each should fail - // b/c the previous Tx wasn't - // readied. - testKit.assertExceptionOnTxChainCreates(txChain, IllegalStateException.class); - } - } - - @Test - public void testCreateChainedTransactionAfterClose() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testCreateChainedTransactionAfterClose", "test-1")) { - - final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - txChain.close(); - - // Try to create another Tx of each type - should fail b/c - // the previous Tx was closed. - testKit.assertExceptionOnTxChainCreates(txChain, DOMTransactionChainClosedException.class); - } - } - - @Test - public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) { - - final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - - // Create a write tx and submit. - final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - - // Create read-only tx's and issue a read. - FluentFuture>> readFuture1 = txChain - .newReadOnlyTransaction().read(TestModel.TEST_PATH); - - FluentFuture>> readFuture2 = txChain - .newReadOnlyTransaction().read(TestModel.TEST_PATH); - - // Create another write tx and issue the write. - DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction(); - writeTx2.write(TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) - .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42)) - .build()); - - // Ensure the reads succeed. - - assertTrue("isPresent", readFuture1.get(5, TimeUnit.SECONDS).isPresent()); - assertTrue("isPresent", readFuture2.get(5, TimeUnit.SECONDS).isPresent()); - - // Ensure the writes succeed. - DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready(); - - testKit.doCommit(cohort1); - testKit.doCommit(cohort2); - - assertTrue("isPresent", txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH) - .get(5, TimeUnit.SECONDS).isPresent()); - } - } - - @Test - public void testChainedTransactionFailureWithSingleShard() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) { - - final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( - ImmutableMap.builder() - .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), - MoreExecutors.directExecutor()); - - final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); - final DOMTransactionChain txChain = broker.createTransactionChain(listener); - - final DOMDataTreeReadWriteTransaction writeTx = txChain.newReadWriteTransaction(); - - writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, - PeopleModel.emptyContainer()); - - final ContainerNode invalidData = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) - .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); - - writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); - - try { - writeTx.commit().get(5, TimeUnit.SECONDS); - fail("Expected TransactionCommitFailedException"); - } catch (final ExecutionException e) { - // Expected - } - - verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), - any(Throwable.class)); - - txChain.close(); - broker.close(); - } - } - - @Test - public void testChainedTransactionFailureWithMultipleShards() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) { - - final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( - ImmutableMap.builder() - .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), - MoreExecutors.directExecutor()); - - final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); - final DOMTransactionChain txChain = broker.createTransactionChain(listener); - - final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction(); - - writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, - PeopleModel.emptyContainer()); - - final ContainerNode invalidData = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) - .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); - - writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); - - // Note that merge will validate the data and fail but put - // succeeds b/c deep validation is not - // done for put for performance reasons. - try { - writeTx.commit().get(5, TimeUnit.SECONDS); - fail("Expected TransactionCommitFailedException"); - } catch (final ExecutionException e) { - // Expected - } - - verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), - any(Throwable.class)); - - txChain.close(); - broker.close(); - } - } - - @Test - public void testDataTreeChangeListenerRegistration() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testDataTreeChangeListenerRegistration", "test-1")) { - - testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - - final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); - - ListenerRegistration listenerReg = dataStore - .registerTreeChangeListener(TestModel.TEST_PATH, listener); - - assertNotNull("registerTreeChangeListener returned null", listenerReg); - - IntegrationTestKit.verifyShardState(dataStore, "test-1", - state -> assertEquals("getTreeChangeListenerActors", 1, - state.getTreeChangeListenerActors().size())); - - // Wait for the initial notification - listener.waitForChangeEvents(TestModel.TEST_PATH); - listener.reset(2); - - // Write 2 updates. - testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) - .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42)) - .build()); - - YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); - testKit.testWriteTransaction(dataStore, listPath, - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); - - // Wait for the 2 updates. - listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); - listenerReg.close(); - - IntegrationTestKit.verifyShardState(dataStore, "test-1", - state -> assertEquals("getTreeChangeListenerActors", 0, - state.getTreeChangeListenerActors().size())); - - testKit.testWriteTransaction(dataStore, - YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)); - - listener.expectNoMoreChanges("Received unexpected change after close"); - } - } - - @Test - public void testRestoreFromDatastoreSnapshot() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - final String name = "transactionIntegrationTest"; - - final ContainerNode carsNode = CarsModel.newCarsNode( - CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)), - CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L)))); - - DataTree dataTree = new InMemoryDataTreeFactory().create( - DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full()); - AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode); - NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); - - final Snapshot carsSnapshot = Snapshot.create( - new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), - Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); - - dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL, - SchemaContextHelper.full()); - - final NormalizedNode peopleNode = PeopleModel.create(); - AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode); - - root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); - - final Snapshot peopleSnapshot = Snapshot.create( - new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), - Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); - - testKit.restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList( - new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot), - new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot))); - - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, name, "module-shards-member1.conf", true, "cars", "people")) { - - final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - - // two reads - Optional> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", carsNode, optional.get()); - - optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", peopleNode, optional.get()); - } - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreWithSegmentedJournalIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreWithSegmentedJournalIntegrationTest.java new file mode 100644 index 0000000000..73888de390 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreWithSegmentedJournalIntegrationTest.java @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.runners.Parameterized.Parameters; +import static org.opendaylight.controller.md.cluster.datastore.model.CarsModel.CAR_QNAME; + +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.AddressFromURIString; +import akka.cluster.Cluster; +import akka.testkit.javadsl.TestKit; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.ConfigFactory; +import java.io.File; +import java.io.IOException; +import java.math.BigInteger; +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; +import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; + +@RunWith(Parameterized.class) +public class DistributedDataStoreWithSegmentedJournalIntegrationTest + extends AbstractDistributedDataStoreIntegrationTest { + + @Parameters(name = "{0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + { DistributedDataStore.class }}); + } + + @Before + public void setUp() { + InMemorySnapshotStore.clear(); + system = ActorSystem.create("cluster-test", + ConfigFactory.load("segmented.conf").getConfig("Member1")); + cleanSnapshotDir(system); + + Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"); + Cluster.get(system).join(member1Address); + } + + @After + public void tearDown() { + TestKit.shutdownActorSystem(system, true); + system = null; + } + + private static void cleanSnapshotDir(final ActorSystem system) { + File journalDir = new File(system.settings().config() + .getString("akka.persistence.journal.segmented-file.root-directory")); + + if (!journalDir.exists()) { + return; + } + + try { + FileUtils.cleanDirectory(journalDir); + } catch (IOException e) { + // Ignore + } + } + + @Test + public void testManyWritesDeletes() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + CollectionNodeBuilder carMapBuilder = ImmutableNodes.mapNodeBuilder(CAR_QNAME); + + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testManyWritesDeletes", "module-shards-cars-member-1.conf", true, "cars")) { + + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + testKit.doCommit(writeTx.ready()); + + int numCars = 20; + for (int i = 0; i < numCars; ++i) { + DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + + YangInstanceIdentifier path = CarsModel.newCarPath("car" + i); + MapEntryNode data = CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)); + + rwTx.merge(path, data); + carMapBuilder.withChild(data); + + testKit.doCommit(rwTx.ready()); + + if (i % 5 == 0) { + rwTx = txChain.newReadWriteTransaction(); + + rwTx.delete(path); + carMapBuilder.withoutChild(path.getLastPathArgument()); + testKit.doCommit(rwTx.ready()); + } + } + + final Optional> optional = txChain.newReadOnlyTransaction() + .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + + MapNode cars = carMapBuilder.build(); + + assertEquals("cars not matching result", cars, optional.get()); + + txChain.close(); + + + // wait until the journal is actually persisted, killing the datastore early results in missing entries + Stopwatch sw = Stopwatch.createStarted(); + AtomicBoolean done = new AtomicBoolean(false); + while (!done.get()) { + MemberNode.verifyRaftState(dataStore, "cars", raftState -> { + if (raftState.getLastApplied() == raftState.getLastLogIndex()) { + done.set(true); + } + }); + + assertTrue("Shard did not persist all journal entries in time.", sw.elapsed(TimeUnit.SECONDS) <= 5); + + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + } + + // test restoration from journal and verify data matches + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testManyWritesDeletes", "module-shards-cars-member-1.conf", true, "cars")) { + + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + MapNode cars = carMapBuilder.build(); + + final Optional> optional = txChain.newReadOnlyTransaction() + .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("restored cars do not match snapshot", cars, optional.get()); + + txChain.close(); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/segmented.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/segmented.conf new file mode 100644 index 0000000000..c667887b54 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/segmented.conf @@ -0,0 +1,73 @@ +Member1 { + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 100ms + } + + in-memory-snapshot-store { + class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore" + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + } + + shard-dispatcher { + type = Dispatcher + executor = "default-executor" + mailbox-type = "org.opendaylight.controller.cluster.common.actor.UnboundedDequeBasedControlAwareMailbox" + } + + akka { + persistence { + snapshot-store.plugin = "in-memory-snapshot-store" + + journal { + plugin = "akka.persistence.journal.segmented-file" + + segmented-file { + class = "org.opendaylight.controller.akka.segjournal.SegmentedFileJournal" + root-directory = "target/segmented-journal" + max-entry-size = 8M + max-segment-size = 32M + memory-mapped = false + } + } + } + + loglevel = "INFO" + + actor { + provider = "akka.cluster.ClusterActorRefProvider" + + serializers { + readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer" + } + + serialization-bindings { + "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal + } + + warn-about-java-serializer-usage = false + } + remote { + log-remote-lifecycle-events = off + artery { + enabled = on + canonical.hostname = "127.0.0.1" + canonical.port = 2558 + } + + netty.tcp { + hostname = "127.0.0.1" + port = 2558 + } + } + + cluster { + retry-unsuccessful-join-after = 100ms + + roles = [ + "member-1" + ] + } + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties index 528cd3f79b..0e8e09e1e4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties @@ -11,3 +11,4 @@ org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.ut org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.sharding=debug org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.access.client=debug org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.messaging=debug +org.slf4j.simpleLogger.log.org.opendaylight.controller.akka.segjournal=debug