X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreIntegrationTest.java;h=e5b14b021e64aa770691aaf06e647ecc4ae4b3f3;hp=2319c5be384326a61ef8fb0d5c7519d2e812c498;hb=95c296a7c1e8e186a88a0a0dc82e080b2185db33;hpb=340a2d4c979ac6f8d5adff8bd9e1c9f724e7a164 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 2319c5be38..e5b14b021e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -1,28 +1,44 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; + import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.Cluster; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; +import java.io.ObjectOutputStream; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -32,12 +48,23 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; +import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; +import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; +import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; +import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; @@ -60,20 +87,23 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; public class DistributedDataStoreIntegrationTest { private static ActorSystem system; - private final DatastoreContext.Builder datastoreContextBuilder = - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100); + private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder() + .shardHeartbeatIntervalInMillis(100); @BeforeClass public static void setUpClass() throws IOException { system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); - Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); + Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"); Cluster.get(system).join(member1Address); } @@ -88,294 +118,302 @@ public class DistributedDataStoreIntegrationTest { } @Test - public void testWriteTransactionWithSingleShard() throws Exception{ - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = - setupDistributedDataStore("transactionIntegrationTest", "test-1"); - - testWriteTransaction(dataStore, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + public void testWriteTransactionWithSingleShard() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + try (AbstractDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", + "test-1")) { - testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - cleanup(dataStore); - }}; + testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + } + } + }; } @Test - public void testWriteTransactionWithMultipleShards() throws Exception{ - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = - setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1"); + public void testWriteTransactionWithMultipleShards() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + try (AbstractDataStore dataStore = setupDistributedDataStore( + "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) { - DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); + DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - doCommit(writeTx.ready()); + doCommit(writeTx.ready()); - writeTx = dataStore.newWriteOnlyTransaction(); + writeTx = dataStore.newWriteOnlyTransaction(); - writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); - doCommit(writeTx.ready()); + doCommit(writeTx.ready()); - writeTx = dataStore.newWriteOnlyTransaction(); + writeTx = dataStore.newWriteOnlyTransaction(); - MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); - writeTx.write(carPath, car); + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + writeTx.write(carPath, car); - MapEntryNode person = PeopleModel.newPersonEntry("jack"); - YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); - writeTx.write(personPath, person); + MapEntryNode person = PeopleModel.newPersonEntry("jack"); + YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + writeTx.write(personPath, person); - doCommit(writeTx.ready()); + doCommit(writeTx.ready()); - // Verify the data in the store + // Verify the data in the store - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - Optional> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", car, optional.get()); + Optional> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); - optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", person, optional.get()); - - cleanup(dataStore); - }}; + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); + } + } + }; } @Test - public void testReadWriteTransactionWithSingleShard() throws Exception{ - System.setProperty("shard.persistent", "true"); - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = - setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1"); + public void testReadWriteTransactionWithSingleShard() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + try (AbstractDataStore dataStore = setupDistributedDataStore( + "testReadWriteTransactionWithSingleShard", "test-1")) { - // 1. Create a read-write Tx + // 1. Create a read-write Tx - DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", readWriteTx); + DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); - // 2. Write some data + // 2. Write some data - YangInstanceIdentifier nodePath = TestModel.TEST_PATH; - NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - readWriteTx.write(nodePath, nodeToWrite ); + YangInstanceIdentifier nodePath = TestModel.TEST_PATH; + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + readWriteTx.write(nodePath, nodeToWrite); - // 3. Read the data from Tx + // 3. Read the data from Tx - Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS); - assertEquals("exists", true, exists); + Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS); + assertEquals("exists", true, exists); - Optional> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", nodeToWrite, optional.get()); + Optional> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", nodeToWrite, optional.get()); - // 4. Ready the Tx for commit + // 4. Ready the Tx for commit - DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready(); + DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready(); - // 5. Commit the Tx + // 5. Commit the Tx - doCommit(cohort); + doCommit(cohort); - // 6. Verify the data in the store + // 6. Verify the data in the store - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", nodeToWrite, optional.get()); - - cleanup(dataStore); - }}; + optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", nodeToWrite, optional.get()); + } + } + }; } @Test - public void testReadWriteTransactionWithMultipleShards() throws Exception{ - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = - setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1"); + public void testReadWriteTransactionWithMultipleShards() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + try (AbstractDataStore dataStore = setupDistributedDataStore( + "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) { - DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", readWriteTx); + DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); - readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - doCommit(readWriteTx.ready()); + doCommit(readWriteTx.ready()); - readWriteTx = dataStore.newReadWriteTransaction(); + readWriteTx = dataStore.newReadWriteTransaction(); - readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); - doCommit(readWriteTx.ready()); + doCommit(readWriteTx.ready()); - readWriteTx = dataStore.newReadWriteTransaction(); + readWriteTx = dataStore.newReadWriteTransaction(); - MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); - readWriteTx.write(carPath, car); + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + readWriteTx.write(carPath, car); - MapEntryNode person = PeopleModel.newPersonEntry("jack"); - YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); - readWriteTx.write(personPath, person); + MapEntryNode person = PeopleModel.newPersonEntry("jack"); + YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + readWriteTx.write(personPath, person); - Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS); - assertEquals("exists", true, exists); + Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS); + assertEquals("exists", true, exists); - Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", car, optional.get()); + Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); - doCommit(readWriteTx.ready()); + doCommit(readWriteTx.ready()); - // Verify the data in the store + // Verify the data in the store - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", car, optional.get()); + optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); - optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", person, optional.get()); + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); - cleanup(dataStore); - }}; + } + } + }; } @Test - public void testSingleTransactionsWritesInQuickSuccession() throws Exception{ - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore( - "testSingleTransactionsWritesInQuickSuccession", "cars-1"); + public void testSingleTransactionsWritesInQuickSuccession() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + try (AbstractDataStore dataStore = setupDistributedDataStore( + "testSingleTransactionsWritesInQuickSuccession", "cars-1")) { - DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - doCommit(writeTx.ready()); + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + doCommit(writeTx.ready()); - writeTx = txChain.newWriteOnlyTransaction(); - - int nCars = 5; - for(int i = 0; i < nCars; i++) { - writeTx.write(CarsModel.newCarPath("car" + i), - CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); - } + writeTx = txChain.newWriteOnlyTransaction(); - doCommit(writeTx.ready()); + int numCars = 5; + for (int i = 0; i < numCars; i++) { + writeTx.write(CarsModel.newCarPath("car" + i), + CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); + } - Optional> optional = txChain.newReadOnlyTransaction().read( - CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("# cars", nCars, ((Collection)optional.get().getValue()).size()); + doCommit(writeTx.ready()); - cleanup(dataStore); - }}; + Optional> optional = txChain.newReadOnlyTransaction() + .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + } + } + }; } - private void testTransactionWritesWithShardNotInitiallyReady(final String testName, - final boolean writeOnly) throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - String shardName = "test-1"; + @SuppressWarnings("checkstyle:IllegalCatch") + private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly) + throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + String shardName = "test-1"; + + // Setup the InMemoryJournal to block shard recovery to ensure + // the shard isn't + // initialized until we create and submit the write the Tx. + String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + + try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { + + // Create the write Tx + + final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() + : dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", writeTx); + + // Do some modification operations and ready the Tx on a + // separate thread. + + final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier + .builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); + + final AtomicReference txCohort = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReady = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + writeTx.merge(TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + + writeTx.write(listEntryPath, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); + + writeTx.delete(listEntryPath); + + txCohort.set(writeTx.ready()); + } catch (Exception e) { + caughtEx.set(e); + return; + } finally { + txReady.countDown(); + } + } + }; + + txThread.start(); + + // Wait for the Tx operations to complete. + + boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); + if (caughtEx.get() != null) { + throw caughtEx.get(); + } - // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't - // initialized until we create and submit the write the Tx. - String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); - CountDownLatch blockRecoveryLatch = new CountDownLatch(1); - InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + assertEquals("Tx ready", true, done); - DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + // At this point the Tx operations should be waiting for the + // shard to initialize so + // trigger the latch to let the shard recovery to continue. - // Create the write Tx + blockRecoveryLatch.countDown(); - final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : - dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", writeTx); + // Wait for the Tx commit to complete. - // Do some modification operations and ready the Tx on a separate thread. + doCommit(txCohort.get()); - final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder( - TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, - TestModel.ID_QNAME, 1).build(); + // Verify the data in the store - final AtomicReference txCohort = new AtomicReference<>(); - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch txReady = new CountDownLatch(1); - Thread txThread = new Thread() { - @Override - public void run() { - try { - writeTx.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder( - TestModel.OUTER_LIST_QNAME).build()); + Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); - writeTx.write(listEntryPath, ImmutableNodes.mapEntry( - TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); - writeTx.delete(listEntryPath); - - txCohort.set(writeTx.ready()); - } catch(Exception e) { - caughtEx.set(e); - return; - } finally { - txReady.countDown(); - } + optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", false, optional.isPresent()); } - }; - - txThread.start(); - - // Wait for the Tx operations to complete. - - boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); - if(caughtEx.get() != null) { - throw caughtEx.get(); } - - assertEquals("Tx ready", true, done); - - // At this point the Tx operations should be waiting for the shard to initialize so - // trigger the latch to let the shard recovery to continue. - - blockRecoveryLatch.countDown(); - - // Wait for the Tx commit to complete. - - doCommit(txCohort.get()); - - // Verify the data in the store - - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - - Optional> optional = readTx.read(TestModel.TEST_PATH). - get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - - optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - - optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", false, optional.isPresent()); - - cleanup(dataStore); - }}; + }; } @Test @@ -390,708 +428,914 @@ public class DistributedDataStoreIntegrationTest { } @Test + @SuppressWarnings("checkstyle:IllegalCatch") public void testTransactionReadsWithShardNotInitiallyReady() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - String testName = "testTransactionReadsWithShardNotInitiallyReady"; - String shardName = "test-1"; - - // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't - // initialized until we create the Tx. - String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); - CountDownLatch blockRecoveryLatch = new CountDownLatch(1); - InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); - - DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); - - // Create the read-write Tx - - final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", readWriteTx); - - // Do some reads on the Tx on a separate thread. - - final AtomicReference> txExistsFuture = - new AtomicReference<>(); - final AtomicReference>, ReadFailedException>> - txReadFuture = new AtomicReference<>(); - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch txReadsDone = new CountDownLatch(1); - Thread txThread = new Thread() { - @Override - public void run() { - try { - readWriteTx.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + String testName = "testTransactionReadsWithShardNotInitiallyReady"; + String shardName = "test-1"; + + // Setup the InMemoryJournal to block shard recovery to ensure + // the shard isn't + // initialized until we create the Tx. + String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + + try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { + + // Create the read-write Tx + + final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); + + // Do some reads on the Tx on a separate thread. + + final AtomicReference> txExistsFuture = + new AtomicReference<>(); + final AtomicReference>, ReadFailedException>> + txReadFuture = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReadsDone = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + readWriteTx.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH)); + + txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); + } catch (Exception e) { + caughtEx.set(e); + return; + } finally { + txReadsDone.countDown(); + } + } + }; + + txThread.start(); + + // Wait for the Tx operations to complete. + + boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS); + if (caughtEx.get() != null) { + throw caughtEx.get(); + } - txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH)); + assertEquals("Tx reads done", true, done); - txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); - } catch(Exception e) { - caughtEx.set(e); - return; - } finally { - txReadsDone.countDown(); - } - } - }; + // At this point the Tx operations should be waiting for the + // shard to initialize so + // trigger the latch to let the shard recovery to continue. - txThread.start(); + blockRecoveryLatch.countDown(); - // Wait for the Tx operations to complete. + // Wait for the reads to complete and verify. - boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS); - if(caughtEx.get() != null) { - throw caughtEx.get(); + assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS)); + assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent()); + + readWriteTx.close(); + } } + }; + } - assertEquals("Tx reads done", true, done); + @Test(expected = NotInitializedException.class) + @SuppressWarnings("checkstyle:IllegalCatch") + public void testTransactionCommitFailureWithShardNotInitialized() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + String testName = "testTransactionCommitFailureWithShardNotInitialized"; + String shardName = "test-1"; - // At this point the Tx operations should be waiting for the shard to initialize so - // trigger the latch to let the shard recovery to continue. + // Set the shard initialization timeout low for the test. - blockRecoveryLatch.countDown(); + datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); - // Wait for the reads to complete and verify. + // Setup the InMemoryJournal to block shard recovery + // indefinitely. - assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS)); - assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent()); + String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); - readWriteTx.close(); + InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence"); - cleanup(dataStore); - }}; - } + try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { - @Test(expected=NotInitializedException.class) - public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{ - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - String testName = "testTransactionCommitFailureWithShardNotInitialized"; - String shardName = "test-1"; + // Create the write Tx - // Set the shard initialization timeout low for the test. + final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newReadWriteTransaction returned null", writeTx); - datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); + // Do some modifications and ready the Tx on a separate + // thread. - // Setup the InMemoryJournal to block shard recovery indefinitely. + final AtomicReference txCohort = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReady = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); - CountDownLatch blockRecoveryLatch = new CountDownLatch(1); - InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + txCohort.set(writeTx.ready()); + } catch (Exception e) { + caughtEx.set(e); + return; + } finally { + txReady.countDown(); + } + } + }; - DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + txThread.start(); - // Create the write Tx + // Wait for the Tx operations to complete. - final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); - assertNotNull("newReadWriteTransaction returned null", writeTx); + boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); + if (caughtEx.get() != null) { + throw caughtEx.get(); + } - // Do some modifications and ready the Tx on a separate thread. + assertEquals("Tx ready", true, done); - final AtomicReference txCohort = new AtomicReference<>(); - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch txReady = new CountDownLatch(1); - Thread txThread = new Thread() { - @Override - public void run() { - try { - writeTx.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + // Wait for the commit to complete. Since the shard never + // initialized, the Tx should + // have timed out and throw an appropriate exception cause. - txCohort.set(writeTx.ready()); - } catch(Exception e) { - caughtEx.set(e); - return; + try { + txCohort.get().canCommit().get(5, TimeUnit.SECONDS); + } catch (ExecutionException e) { + Throwables.propagateIfInstanceOf(e.getCause(), Exception.class); + Throwables.propagate(e.getCause()); } finally { - txReady.countDown(); + blockRecoveryLatch.countDown(); } } - }; + } + }; + } - txThread.start(); + @Test(expected = NotInitializedException.class) + @SuppressWarnings("checkstyle:IllegalCatch") + public void testTransactionReadFailureWithShardNotInitialized() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + String testName = "testTransactionReadFailureWithShardNotInitialized"; + String shardName = "test-1"; - // Wait for the Tx operations to complete. + // Set the shard initialization timeout low for the test. - boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); - if(caughtEx.get() != null) { - throw caughtEx.get(); - } + datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); - assertEquals("Tx ready", true, done); + // Setup the InMemoryJournal to block shard recovery + // indefinitely. - // Wait for the commit to complete. Since the shard never initialized, the Tx should - // have timed out and throw an appropriate exception cause. + String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); - try { - txCohort.get().canCommit().get(5, TimeUnit.SECONDS); - } catch(ExecutionException e) { - throw e.getCause(); - } finally { - blockRecoveryLatch.countDown(); - cleanup(dataStore); - } - }}; - } + InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence"); - @Test(expected=NotInitializedException.class) - public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{ - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - String testName = "testTransactionReadFailureWithShardNotInitialized"; - String shardName = "test-1"; + try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { - // Set the shard initialization timeout low for the test. + // Create the read-write Tx - datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); + final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); - // Setup the InMemoryJournal to block shard recovery indefinitely. + // Do a read on the Tx on a separate thread. - String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); - CountDownLatch blockRecoveryLatch = new CountDownLatch(1); - InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + final AtomicReference>, ReadFailedException>> + txReadFuture = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReadDone = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + readWriteTx.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); - // Create the read-write Tx + readWriteTx.close(); + } catch (Exception e) { + caughtEx.set(e); + return; + } finally { + txReadDone.countDown(); + } + } + }; - final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", readWriteTx); + txThread.start(); - // Do a read on the Tx on a separate thread. + // Wait for the Tx operations to complete. - final AtomicReference>, ReadFailedException>> - txReadFuture = new AtomicReference<>(); - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch txReadDone = new CountDownLatch(1); - Thread txThread = new Thread() { - @Override - public void run() { - try { - readWriteTx.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS); + if (caughtEx.get() != null) { + throw caughtEx.get(); + } - txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); + assertEquals("Tx read done", true, done); - readWriteTx.close(); - } catch(Exception e) { - caughtEx.set(e); - return; + // Wait for the read to complete. Since the shard never + // initialized, the Tx should + // have timed out and throw an appropriate exception cause. + + try { + txReadFuture.get().checkedGet(5, TimeUnit.SECONDS); + } catch (ReadFailedException e) { + Throwables.propagateIfInstanceOf(e.getCause(), Exception.class); + Throwables.propagate(e.getCause()); } finally { - txReadDone.countDown(); + blockRecoveryLatch.countDown(); } } - }; - - txThread.start(); - - // Wait for the Tx operations to complete. - - boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS); - if(caughtEx.get() != null) { - throw caughtEx.get(); } + }; + } - assertEquals("Tx read done", true, done); - - // Wait for the read to complete. Since the shard never initialized, the Tx should - // have timed out and throw an appropriate exception cause. - - try { - txReadFuture.get().checkedGet(5, TimeUnit.SECONDS); - } catch(ReadFailedException e) { - throw e.getCause(); - } finally { - blockRecoveryLatch.countDown(); - cleanup(dataStore); + @SuppressWarnings("checkstyle:IllegalCatch") + private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName) + throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + String shardName = "default"; + + // We don't want the shard to become the leader so prevent shard + // elections. + datastoreContextBuilder.customRaftPolicyImplementation( + "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy"); + + // The ShardManager uses the election timeout for FindPrimary so + // reset it low so it will timeout quickly. + datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1) + .shardInitializationTimeout(200, TimeUnit.MILLISECONDS); + + try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { + + Object result = dataStore.getActorContext().executeOperation( + dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true)); + assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound); + + // Create the write Tx. + + try (DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() + : dataStore.newReadWriteTransaction()) { + assertNotNull("newReadWriteTransaction returned null", writeTx); + + // Do some modifications and ready the Tx on a separate + // thread. + + final AtomicReference txCohort = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReady = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + writeTx.write(TestModel.JUNK_PATH, + ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); + + txCohort.set(writeTx.ready()); + } catch (Exception e) { + caughtEx.set(e); + return; + } finally { + txReady.countDown(); + } + } + }; + + txThread.start(); + + // Wait for the Tx operations to complete. + + boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); + if (caughtEx.get() != null) { + throw caughtEx.get(); + } + + assertEquals("Tx ready", true, done); + + // Wait for the commit to complete. Since no shard + // leader was elected in time, the Tx + // should have timed out and throw an appropriate + // exception cause. + + try { + txCohort.get().canCommit().get(5, TimeUnit.SECONDS); + } catch (ExecutionException e) { + Throwables.propagateIfInstanceOf(e.getCause(), Exception.class); + Throwables.propagate(e.getCause()); + } + } + } } - }}; + }; } - private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - String testName = "testTransactionCommitFailureWithNoShardLeader"; - String shardName = "default"; - - // We don't want the shard to become the leader so prevent shard election from completing - // by setting the election timeout, which is based on the heartbeat interval, really high. - - datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000); - datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); + @Test(expected = NoShardLeaderException.class) + public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception { + datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader"); + } - // Set the leader election timeout low for the test. + @Test(expected = NoShardLeaderException.class) + public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception { + testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader"); + } - datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS); + @Test + public void testTransactionAbort() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + try (AbstractDataStore dataStore = setupDistributedDataStore("transactionAbortIntegrationTest", + "test-1")) { - DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); - // Create the write Tx. + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : - dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", writeTx); + DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); - // Do some modifications and ready the Tx on a separate thread. + cohort.canCommit().get(5, TimeUnit.SECONDS); - final AtomicReference txCohort = new AtomicReference<>(); - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch txReady = new CountDownLatch(1); - Thread txThread = new Thread() { - @Override - public void run() { - try { - writeTx.write(TestModel.JUNK_PATH, - ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); + cohort.abort().get(5, TimeUnit.SECONDS); - txCohort.set(writeTx.ready()); - } catch(Exception e) { - caughtEx.set(e); - return; - } finally { - txReady.countDown(); - } + testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); } - }; - - txThread.start(); - - // Wait for the Tx operations to complete. - - boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); - if(caughtEx.get() != null) { - throw caughtEx.get(); - } - - assertEquals("Tx ready", true, done); - - // Wait for the commit to complete. Since no shard leader was elected in time, the Tx - // should have timed out and throw an appropriate exception cause. - - try { - txCohort.get().canCommit().get(5, TimeUnit.SECONDS); - } catch(ExecutionException e) { - throw e.getCause(); - } finally { - cleanup(dataStore); } - }}; + }; } - @Test(expected=NoShardLeaderException.class) - public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable { - datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); - testTransactionCommitFailureWithNoShardLeader(true); - } + @Test + @SuppressWarnings("checkstyle:IllegalCatch") + public void testTransactionChainWithSingleShard() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + try (AbstractDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", + "test-1")) { - @Test(expected=NoShardLeaderException.class) - public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable { - testTransactionCommitFailureWithNoShardLeader(false); - } + // 1. Create a Tx chain and write-only Tx - @Test - public void testTransactionAbort() throws Exception{ - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = - setupDistributedDataStore("transactionAbortIntegrationTest", "test-1"); + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + // 2. Write some data - DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + writeTx.write(TestModel.TEST_PATH, testNode); - cohort.canCommit().get(5, TimeUnit.SECONDS); + // 3. Ready the Tx for commit - cohort.abort().get(5, TimeUnit.SECONDS); + final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - testWriteTransaction(dataStore, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + // 4. Commit the Tx on another thread that first waits for + // the second read Tx. - cleanup(dataStore); - }}; - } + final CountDownLatch continueCommit1 = new CountDownLatch(1); + final CountDownLatch commit1Done = new CountDownLatch(1); + final AtomicReference commit1Error = new AtomicReference<>(); + new Thread() { + @Override + public void run() { + try { + continueCommit1.await(); + doCommit(cohort1); + } catch (Exception e) { + commit1Error.set(e); + } finally { + commit1Done.countDown(); + } + } + }.start(); - @Test - public void testTransactionChainWithSingleShard() throws Exception{ - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1"); + // 5. Create a new read Tx from the chain to read and verify + // the data from the first + // Tx is visible after being readied. - // 1. Create a Tx chain and write-only Tx + DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction(); + Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", testNode, optional.get()); - DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + // 6. Create a new RW Tx from the chain, write more data, + // and ready it - DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); + DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(); + rwTx.write(TestModel.OUTER_LIST_PATH, outerNode); - // 2. Write some data + final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready(); - NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - writeTx.write(TestModel.TEST_PATH, testNode); + // 7. Create a new read Tx from the chain to read the data + // from the last RW Tx to + // verify it is visible. - // 3. Ready the Tx for commit + readTx = txChain.newReadWriteTransaction(); + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", outerNode, optional.get()); - final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + // 8. Wait for the 2 commits to complete and close the + // chain. - // 4. Commit the Tx on another thread that first waits for the second read Tx. + continueCommit1.countDown(); + Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS); - final CountDownLatch continueCommit1 = new CountDownLatch(1); - final CountDownLatch commit1Done = new CountDownLatch(1); - final AtomicReference commit1Error = new AtomicReference<>(); - new Thread() { - @Override - public void run() { - try { - continueCommit1.await(); - doCommit(cohort1); - } catch (Exception e) { - commit1Error.set(e); - } finally { - commit1Done.countDown(); + if (commit1Error.get() != null) { + throw commit1Error.get(); } - } - }.start(); - // 5. Create a new read Tx from the chain to read and verify the data from the first - // Tx is visible after being readied. + doCommit(cohort2); - DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction(); - Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", testNode, optional.get()); + txChain.close(); - // 6. Create a new RW Tx from the chain, write more data, and ready it + // 9. Create a new read Tx from the data store and verify + // committed data. - DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); - MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(); - rwTx.write(TestModel.OUTER_LIST_PATH, outerNode); + readTx = dataStore.newReadOnlyTransaction(); + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", outerNode, optional.get()); + } + } + }; + } - DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready(); + @Test + public void testTransactionChainWithMultipleShards() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + try (AbstractDataStore dataStore = setupDistributedDataStore( + "testTransactionChainWithMultipleShards", "cars-1", "people-1")) { - // 7. Create a new read Tx from the chain to read the data from the last RW Tx to - // verify it is visible. + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - readTx = txChain.newReadWriteTransaction(); - optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", outerNode, optional.get()); + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); - // 8. Wait for the 2 commits to complete and close the chain. + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - continueCommit1.countDown(); - Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); - if(commit1Error.get() != null) { - throw commit1Error.get(); - } + final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - doCommit(cohort2); + DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction(); - txChain.close(); + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + readWriteTx.write(carPath, car); - // 9. Create a new read Tx from the data store and verify committed data. + MapEntryNode person = PeopleModel.newPersonEntry("jack"); + YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + readWriteTx.merge(personPath, person); - readTx = dataStore.newReadOnlyTransaction(); - optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", outerNode, optional.get()); + Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); - cleanup(dataStore); - }}; - } + optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); - @Test - public void testTransactionChainWithMultipleShards() throws Exception{ - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards", - "cars-1", "people-1"); + DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); - DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + writeTx = txChain.newWriteOnlyTransaction(); - DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); + writeTx.delete(carPath); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready(); - writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + ListenableFuture canCommit1 = cohort1.canCommit(); + ListenableFuture canCommit2 = cohort2.canCommit(); - DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + doCommit(canCommit1, cohort1); + doCommit(canCommit2, cohort2); + doCommit(cohort3); - DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction(); + txChain.close(); - MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); - readWriteTx.write(carPath, car); + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - MapEntryNode person = PeopleModel.newPersonEntry("jack"); - YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); - readWriteTx.merge(personPath, person); + optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", false, optional.isPresent()); - Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", car, optional.get()); + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); + } + } + }; + } - optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", person, optional.get()); + @Test + public void testCreateChainedTransactionsInQuickSuccession() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + try (AbstractDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionsInQuickSuccession", "cars-1")) { - DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); + ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ImmutableMap.builder() + .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), + MoreExecutors.directExecutor()); - writeTx = txChain.newWriteOnlyTransaction(); + TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + DOMTransactionChain txChain = broker.createTransactionChain(listener); - writeTx.delete(carPath); + List> futures = new ArrayList<>(); - DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready(); + DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + futures.add(writeTx.submit()); - ListenableFuture canCommit1 = cohort1.canCommit(); - ListenableFuture canCommit2 = cohort2.canCommit(); + int numCars = 100; + for (int i = 0; i < numCars; i++) { + DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); - doCommit(canCommit1, cohort1); - doCommit(canCommit2, cohort2); - doCommit(cohort3); + rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i), + CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); - txChain.close(); + futures.add(rwTx.submit()); + } - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + for (CheckedFuture f : futures) { + f.checkedGet(); + } - optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", false, optional.isPresent()); + Optional> optional = txChain.newReadOnlyTransaction() + .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); - optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", person, optional.get()); + txChain.close(); - cleanup(dataStore); - }}; + broker.close(); + } + } + }; } @Test - public void testCreateChainedTransactionsInQuickSuccession() throws Exception{ - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore( - "testCreateChainedTransactionsInQuickSuccession", "cars-1"); + public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + try (AbstractDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) { - ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( - ImmutableMap.builder().put( - LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); - DOMTransactionChain txChain = broker.createTransactionChain(listener); + DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction(); - List> futures = new ArrayList<>(); + rwTx1.ready(); - DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - futures.add(writeTx.submit()); + DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction(); - int nCars = 100; - for(int i = 0; i < nCars; i++) { - DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + Optional> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", false, optional.isPresent()); - rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i), - CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); - - futures.add(rwTx.submit()); + txChain.close(); + } } + }; + } - for(CheckedFuture f: futures) { - f.checkedGet(); - } + @Test + public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + try (AbstractDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) { - Optional> optional = txChain.newReadOnlyTransaction().read( - LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("# cars", nCars, ((Collection)optional.get().getValue()).size()); + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - txChain.close(); + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); - broker.close(); + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - cleanup(dataStore); - }}; + // Try to create another Tx of each type - each should fail + // b/c the previous Tx wasn't + // readied. + + assertExceptionOnTxChainCreates(txChain, IllegalStateException.class); + } + } + }; } @Test - public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{ - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore( - "testCreateChainedTransactionAfterEmptyTxReadied", "test-1"); + public void testCreateChainedTransactionAfterClose() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + try (AbstractDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionAfterClose", "test-1")) { - DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction(); + txChain.close(); - rwTx1.ready(); + // Try to create another Tx of each type - should fail b/c + // the previous Tx was closed. - DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction(); + assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class); + } + } + }; + } - Optional> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", false, optional.isPresent()); + @Test + public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + try (AbstractDataStore dataStore = setupDistributedDataStore( + "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) { - txChain.close(); + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - cleanup(dataStore); - }}; - } + // Create a write tx and submit. - @Test - public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore( - "testCreateChainedTransactionWhenPreviousNotReady", "test-1"); + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + // Create read-only tx's and issue a read. - DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); + CheckedFuture>, ReadFailedException> readFuture1 = txChain + .newReadOnlyTransaction().read(TestModel.TEST_PATH); - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + CheckedFuture>, ReadFailedException> readFuture2 = txChain + .newReadOnlyTransaction().read(TestModel.TEST_PATH); - // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't - // readied. + // Create another write tx and issue the write. - assertExceptionOnTxChainCreates(txChain, IllegalStateException.class); - }}; - } + DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction(); + writeTx2.write(TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - @Test - public void testCreateChainedTransactionAfterClose() throws Throwable { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore( - "testCreateChainedTransactionAfterClose", "test-1"); + // Ensure the reads succeed. + + assertEquals("isPresent", true, readFuture1.checkedGet(5, TimeUnit.SECONDS).isPresent()); + assertEquals("isPresent", true, readFuture2.checkedGet(5, TimeUnit.SECONDS).isPresent()); - DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + // Ensure the writes succeed. - txChain.close(); + DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready(); - // Try to create another Tx of each type - should fail b/c the previous Tx was closed. + doCommit(cohort1); + doCommit(cohort2); - assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class); - }}; + assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH) + .checkedGet(5, TimeUnit.SECONDS).isPresent()); + } + } + }; } @Test - public void testChainedTransactionFailureWithSingleShard() throws Exception{ - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore( - "testChainedTransactionFailureWithSingleShard", "cars-1"); + public void testChainedTransactionFailureWithSingleShard() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + try (AbstractDataStore dataStore = setupDistributedDataStore( + "testChainedTransactionFailureWithSingleShard", "cars-1")) { - ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( - ImmutableMap.builder().put( - LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); + ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ImmutableMap.builder() + .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), + MoreExecutors.directExecutor()); - TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); - DOMTransactionChain txChain = broker.createTransactionChain(listener); + TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + DOMTransactionChain txChain = broker.createTransactionChain(listener); - DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); - ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( - new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)). - withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); + ContainerNode invalidData = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) + .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); - rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); - try { - rwTx.submit().checkedGet(5, TimeUnit.SECONDS); - fail("Expected TransactionCommitFailedException"); - } catch (TransactionCommitFailedException e) { - // Expected - } + try { + rwTx.submit().checkedGet(5, TimeUnit.SECONDS); + fail("Expected TransactionCommitFailedException"); + } catch (TransactionCommitFailedException e) { + // Expected + } - verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class)); + verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), + any(Throwable.class)); - txChain.close(); - broker.close(); - cleanup(dataStore); - }}; + txChain.close(); + broker.close(); + } + } + }; } @Test - public void testChainedTransactionFailureWithMultipleShards() throws Exception{ - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore( - "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1"); + public void testChainedTransactionFailureWithMultipleShards() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + try (AbstractDataStore dataStore = setupDistributedDataStore( + "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) { - ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( - ImmutableMap.builder().put( - LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); + ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ImmutableMap.builder() + .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), + MoreExecutors.directExecutor()); - TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); - DOMTransactionChain txChain = broker.createTransactionChain(listener); + TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + DOMTransactionChain txChain = broker.createTransactionChain(listener); - DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, + PeopleModel.emptyContainer()); - ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( - new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)). - withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); + ContainerNode invalidData = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) + .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); - // Note that merge will validate the data and fail but put succeeds b/c deep validation is not - // done for put for performance reasons. - writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + // Note that merge will validate the data and fail but put + // succeeds b/c deep validation is not + // done for put for performance reasons. + writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); - try { - writeTx.submit().checkedGet(5, TimeUnit.SECONDS); - fail("Expected TransactionCommitFailedException"); - } catch (TransactionCommitFailedException e) { - // Expected - } + try { + writeTx.submit().checkedGet(5, TimeUnit.SECONDS); + fail("Expected TransactionCommitFailedException"); + } catch (TransactionCommitFailedException e) { + // Expected + } - verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class)); + verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), + any(Throwable.class)); - txChain.close(); - broker.close(); - cleanup(dataStore); - }}; + txChain.close(); + broker.close(); + } + } + }; } @Test - public void testChangeListenerRegistration() throws Exception{ - new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = - setupDistributedDataStore("testChangeListenerRegistration", "test-1"); + public void testChangeListenerRegistration() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + try (AbstractDataStore dataStore = setupDistributedDataStore("testChangeListenerRegistration", + "test-1")) { + + testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - testWriteTransaction(dataStore, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + MockDataChangeListener listener = new MockDataChangeListener(1); - MockDataChangeListener listener = new MockDataChangeListener(1); + ListenerRegistration listenerReg = dataStore + .registerChangeListener(TestModel.TEST_PATH, listener, DataChangeScope.SUBTREE); - ListenerRegistration - listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener, - DataChangeScope.SUBTREE); + assertNotNull("registerChangeListener returned null", listenerReg); - assertNotNull("registerChangeListener returned null", listenerReg); + // Wait for the initial notification - // Wait for the initial notification + listener.waitForChangeEvents(TestModel.TEST_PATH); - listener.waitForChangeEvents(TestModel.TEST_PATH); + listener.reset(2); - listener.reset(2); + // Write 2 updates. - // Write 2 updates. + testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); + testWriteTransaction(dataStore, listPath, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); - YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH). - nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); - testWriteTransaction(dataStore, listPath, - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); + // Wait for the 2 updates. - // Wait for the 2 updates. + listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); - listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); + listenerReg.close(); - listenerReg.close(); + testWriteTransaction(dataStore, + YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)); - testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH). - nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)); + listener.expectNoMoreChanges("Received unexpected change after close"); + } + } + }; + } - listener.expectNoMoreChanges("Received unexpected change after close"); + @Test + public void testRestoreFromDatastoreSnapshot() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + final String name = "transactionIntegrationTest"; + + ContainerNode carsNode = CarsModel.newCarsNode( + CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)), + CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L)))); + + DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); + dataTree.setSchemaContext(SchemaContextHelper.full()); + AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode); + NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); + + final Snapshot carsSnapshot = Snapshot.create( + new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), + Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); + + NormalizedNode peopleNode = PeopleModel.create(); + dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); + dataTree.setSchemaContext(SchemaContextHelper.full()); + AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode); + root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); + + Snapshot peopleSnapshot = Snapshot.create( + new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), + Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); + + restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList( + new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot), + new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot))); + + try (AbstractDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf", + true, "cars", "people")) { + + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + Optional> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", carsNode, optional.get()); + + optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", peopleNode, optional.get()); + } + } + }; + } - cleanup(dataStore); - }}; + @Test + @Deprecated + public void testRecoveryFromPreCarbonSnapshot() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + final String name = "testRecoveryFromPreCarbonSnapshot"; + + ContainerNode carsNode = CarsModel.newCarsNode( + CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)), + CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L)))); + + DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); + dataTree.setSchemaContext(SchemaContextHelper.full()); + AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode); + NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); + + MetadataShardDataTreeSnapshot shardSnapshot = new MetadataShardDataTreeSnapshot(root); + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try (final DataOutputStream dos = new DataOutputStream(bos)) { + PayloadVersion.BORON.writeTo(dos); + try (ObjectOutputStream oos = new ObjectOutputStream(dos)) { + oos.writeObject(shardSnapshot); + } + } + + final org.opendaylight.controller.cluster.raft.Snapshot snapshot = + org.opendaylight.controller.cluster.raft.Snapshot.create(bos.toByteArray(), + Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); + + InMemorySnapshotStore.addSnapshot("member-1-shard-cars-" + name, snapshot); + + try (AbstractDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf", + true, "cars")) { + + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + Optional> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", carsNode, optional.get()); + } + } + }; } }