X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreIntegrationTest.java;h=d0798688c813fa26f4ed516aecb2f5a1ed4133c7;hp=5bd67b4ed45ca95ea63a2da9e99292be9898faea;hb=e9fc7e7ed2b13d274518d6a872ab67749ef4507a;hpb=2f77e92af7a68b4a97dbfb709c6cc9b11a49878a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 5bd67b4ed4..d0798688c8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -21,7 +21,7 @@ import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.Cluster; -import akka.testkit.JavaTestKit; +import akka.testkit.javadsl.TestKit; import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; @@ -30,7 +30,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; @@ -42,10 +41,16 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; import org.mockito.Mockito; +import org.opendaylight.controller.cluster.access.client.RequestTimeoutException; +import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore; import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; @@ -55,7 +60,7 @@ import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; -import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; @@ -85,28 +90,41 @@ import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +@RunWith(Parameterized.class) public class DistributedDataStoreIntegrationTest { - private static ActorSystem system; + @Parameters(name = "{0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + { DistributedDataStore.class }, { ClientBackedDataStore.class } + }); + } + + @Parameter + public Class testParameter; + + private ActorSystem system; private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder() .shardHeartbeatIntervalInMillis(100); - @BeforeClass - public static void setUpClass() throws IOException { + @Before + public void setUp() throws IOException { + InMemorySnapshotStore.clear(); + InMemoryJournal.clear(); system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"); Cluster.get(system).join(member1Address); } - @AfterClass - public static void tearDownClass() throws IOException { - JavaTestKit.shutdownActorSystem(system); + @After + public void tearDown() throws IOException { + TestKit.shutdownActorSystem(system, Boolean.TRUE); system = null; } @@ -118,8 +136,8 @@ public class DistributedDataStoreIntegrationTest { public void testWriteTransactionWithSingleShard() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (AbstractDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", - "test-1")) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "transactionIntegrationTest", "test-1")) { testWriteTransaction(dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -135,8 +153,8 @@ public class DistributedDataStoreIntegrationTest { public void testWriteTransactionWithMultipleShards() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (AbstractDataStore dataStore = setupDistributedDataStore( - "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) { DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); assertNotNull("newWriteOnlyTransaction returned null", writeTx); @@ -155,19 +173,18 @@ public class DistributedDataStoreIntegrationTest { writeTx = dataStore.newWriteOnlyTransaction(); - MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); writeTx.write(carPath, car); - MapEntryNode person = PeopleModel.newPersonEntry("jack"); - YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + final MapEntryNode person = PeopleModel.newPersonEntry("jack"); + final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); writeTx.write(personPath, person); doCommit(writeTx.ready()); // Verify the data in the store - - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); Optional> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); @@ -185,23 +202,20 @@ public class DistributedDataStoreIntegrationTest { public void testReadWriteTransactionWithSingleShard() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (AbstractDataStore dataStore = setupDistributedDataStore( - "testReadWriteTransactionWithSingleShard", "test-1")) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) { // 1. Create a read-write Tx - - DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", readWriteTx); // 2. Write some data - - YangInstanceIdentifier nodePath = TestModel.TEST_PATH; - NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + final YangInstanceIdentifier nodePath = TestModel.TEST_PATH; + final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); readWriteTx.write(nodePath, nodeToWrite); // 3. Read the data from Tx - - Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS); + final Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS); assertEquals("exists", true, exists); Optional> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS); @@ -209,16 +223,13 @@ public class DistributedDataStoreIntegrationTest { assertEquals("Data node", nodeToWrite, optional.get()); // 4. Ready the Tx for commit - - DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready(); + final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready(); // 5. Commit the Tx - doCommit(cohort); // 6. Verify the data in the store - - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); @@ -232,8 +243,8 @@ public class DistributedDataStoreIntegrationTest { public void testReadWriteTransactionWithMultipleShards() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (AbstractDataStore dataStore = setupDistributedDataStore( - "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) { DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", readWriteTx); @@ -252,15 +263,15 @@ public class DistributedDataStoreIntegrationTest { readWriteTx = dataStore.newReadWriteTransaction(); - MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); readWriteTx.write(carPath, car); - MapEntryNode person = PeopleModel.newPersonEntry("jack"); - YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + final MapEntryNode person = PeopleModel.newPersonEntry("jack"); + final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); readWriteTx.write(personPath, person); - Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS); + final Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS); assertEquals("exists", true, exists); Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); @@ -270,7 +281,6 @@ public class DistributedDataStoreIntegrationTest { doCommit(readWriteTx.ready()); // Verify the data in the store - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); @@ -290,10 +300,10 @@ public class DistributedDataStoreIntegrationTest { public void testSingleTransactionsWritesInQuickSuccession() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (AbstractDataStore dataStore = setupDistributedDataStore( - "testSingleTransactionsWritesInQuickSuccession", "cars-1")) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) { - DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); @@ -310,7 +320,7 @@ public class DistributedDataStoreIntegrationTest { doCommit(writeTx.ready()); - Optional> optional = txChain.newReadOnlyTransaction() + final Optional> optional = txChain.newReadOnlyTransaction() .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); @@ -324,26 +334,25 @@ public class DistributedDataStoreIntegrationTest { throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - String shardName = "test-1"; + final String shardName = "test-1"; // Setup the InMemoryJournal to block shard recovery to ensure // the shard isn't // initialized until we create and submit the write the Tx. - String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); - CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + final CountDownLatch blockRecoveryLatch = new CountDownLatch(1); InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); - try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, testName, false, shardName)) { // Create the write Tx - final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", writeTx); // Do some modification operations and ready the Tx on a // separate thread. - final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier .builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); @@ -351,35 +360,30 @@ public class DistributedDataStoreIntegrationTest { final AtomicReference txCohort = new AtomicReference<>(); final AtomicReference caughtEx = new AtomicReference<>(); final CountDownLatch txReady = new CountDownLatch(1); - Thread txThread = new Thread() { - @Override - public void run() { - try { - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + final Thread txThread = new Thread(() -> { + try { + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - writeTx.merge(TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + writeTx.merge(TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - writeTx.write(listEntryPath, - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); + writeTx.write(listEntryPath, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); - writeTx.delete(listEntryPath); + writeTx.delete(listEntryPath); - txCohort.set(writeTx.ready()); - } catch (Exception e) { - caughtEx.set(e); - return; - } finally { - txReady.countDown(); - } + txCohort.set(writeTx.ready()); + } catch (Exception e) { + caughtEx.set(e); + } finally { + txReady.countDown(); } - }; + }); txThread.start(); // Wait for the Tx operations to complete. - - boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); + final boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); if (caughtEx.get() != null) { throw caughtEx.get(); } @@ -389,16 +393,13 @@ public class DistributedDataStoreIntegrationTest { // At this point the Tx operations should be waiting for the // shard to initialize so // trigger the latch to let the shard recovery to continue. - blockRecoveryLatch.countDown(); // Wait for the Tx commit to complete. - doCommit(txCohort.get()); // Verify the data in the store - - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); @@ -429,54 +430,48 @@ public class DistributedDataStoreIntegrationTest { public void testTransactionReadsWithShardNotInitiallyReady() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - String testName = "testTransactionReadsWithShardNotInitiallyReady"; - String shardName = "test-1"; + final String testName = "testTransactionReadsWithShardNotInitiallyReady"; + final String shardName = "test-1"; // Setup the InMemoryJournal to block shard recovery to ensure // the shard isn't // initialized until we create the Tx. - String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); - CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + final CountDownLatch blockRecoveryLatch = new CountDownLatch(1); InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); - try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, testName, false, shardName)) { // Create the read-write Tx - final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", readWriteTx); // Do some reads on the Tx on a separate thread. - final AtomicReference> txExistsFuture = new AtomicReference<>(); final AtomicReference>, ReadFailedException>> txReadFuture = new AtomicReference<>(); final AtomicReference caughtEx = new AtomicReference<>(); final CountDownLatch txReadsDone = new CountDownLatch(1); - Thread txThread = new Thread() { - @Override - public void run() { - try { - readWriteTx.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + final Thread txThread = new Thread(() -> { + try { + readWriteTx.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH)); + txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH)); - txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); - } catch (Exception e) { - caughtEx.set(e); - return; - } finally { - txReadsDone.countDown(); - } + txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); + } catch (Exception e) { + caughtEx.set(e); + } finally { + txReadsDone.countDown(); } - }; + }); txThread.start(); // Wait for the Tx operations to complete. - boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS); if (caughtEx.get() != null) { throw caughtEx.get(); @@ -487,11 +482,9 @@ public class DistributedDataStoreIntegrationTest { // At this point the Tx operations should be waiting for the // shard to initialize so // trigger the latch to let the shard recovery to continue. - blockRecoveryLatch.countDown(); // Wait for the reads to complete and verify. - assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS)); assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent()); @@ -506,74 +499,66 @@ public class DistributedDataStoreIntegrationTest { public void testTransactionCommitFailureWithShardNotInitialized() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - String testName = "testTransactionCommitFailureWithShardNotInitialized"; - String shardName = "test-1"; + final String testName = "testTransactionCommitFailureWithShardNotInitialized"; + final String shardName = "test-1"; // Set the shard initialization timeout low for the test. - datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); // Setup the InMemoryJournal to block shard recovery // indefinitely. - - String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); - CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + final CountDownLatch blockRecoveryLatch = new CountDownLatch(1); InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence"); - try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { - - // Create the write Tx - - final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); - assertNotNull("newReadWriteTransaction returned null", writeTx); - - // Do some modifications and ready the Tx on a separate - // thread. - - final AtomicReference txCohort = new AtomicReference<>(); - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch txReady = new CountDownLatch(1); - Thread txThread = new Thread() { - @Override - public void run() { - try { - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - - txCohort.set(writeTx.ready()); - } catch (Exception e) { - caughtEx.set(e); - return; - } finally { - txReady.countDown(); - } - } - }; + final AbstractDataStore dataStore = + setupAbstractDataStore(testParameter, testName, false, shardName); - txThread.start(); + // Create the write Tx + final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newReadWriteTransaction returned null", writeTx); - // Wait for the Tx operations to complete. + // Do some modifications and ready the Tx on a separate + // thread. + final AtomicReference txCohort = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReady = new CountDownLatch(1); + final Thread txThread = new Thread(() -> { + try { + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); - if (caughtEx.get() != null) { - throw caughtEx.get(); + txCohort.set(writeTx.ready()); + } catch (Exception e) { + caughtEx.set(e); + } finally { + txReady.countDown(); } + }); - assertEquals("Tx ready", true, done); + txThread.start(); - // Wait for the commit to complete. Since the shard never - // initialized, the Tx should - // have timed out and throw an appropriate exception cause. + // Wait for the Tx operations to complete. + boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); + if (caughtEx.get() != null) { + throw caughtEx.get(); + } - try { - txCohort.get().canCommit().get(5, TimeUnit.SECONDS); - } catch (ExecutionException e) { - Throwables.propagateIfInstanceOf(e.getCause(), Exception.class); - Throwables.propagate(e.getCause()); - } finally { - blockRecoveryLatch.countDown(); - } + assertEquals("Tx ready", true, done); + + // Wait for the commit to complete. Since the shard never + // initialized, the Tx should + // have timed out and throw an appropriate exception cause. + try { + txCohort.get().canCommit().get(5, TimeUnit.SECONDS); + fail("Expected NotInitializedException"); + } catch (final Exception e) { + final Throwable root = Throwables.getRootCause(e); + Throwables.throwIfUnchecked(root); + throw new RuntimeException(root); + } finally { + blockRecoveryLatch.countDown(); } } }; @@ -584,58 +569,50 @@ public class DistributedDataStoreIntegrationTest { public void testTransactionReadFailureWithShardNotInitialized() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - String testName = "testTransactionReadFailureWithShardNotInitialized"; - String shardName = "test-1"; + final String testName = "testTransactionReadFailureWithShardNotInitialized"; + final String shardName = "test-1"; // Set the shard initialization timeout low for the test. - datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); // Setup the InMemoryJournal to block shard recovery // indefinitely. - - String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); - CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + final CountDownLatch blockRecoveryLatch = new CountDownLatch(1); InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence"); - try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, testName, false, shardName)) { // Create the read-write Tx - final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", readWriteTx); // Do a read on the Tx on a separate thread. - final AtomicReference>, ReadFailedException>> txReadFuture = new AtomicReference<>(); final AtomicReference caughtEx = new AtomicReference<>(); final CountDownLatch txReadDone = new CountDownLatch(1); - Thread txThread = new Thread() { - @Override - public void run() { - try { - readWriteTx.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + final Thread txThread = new Thread(() -> { + try { + readWriteTx.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); + txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); - readWriteTx.close(); - } catch (Exception e) { - caughtEx.set(e); - return; - } finally { - txReadDone.countDown(); - } + readWriteTx.close(); + } catch (Exception e) { + caughtEx.set(e); + } finally { + txReadDone.countDown(); } - }; + }); txThread.start(); // Wait for the Tx operations to complete. - boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS); if (caughtEx.get() != null) { throw caughtEx.get(); @@ -646,12 +623,13 @@ public class DistributedDataStoreIntegrationTest { // Wait for the read to complete. Since the shard never // initialized, the Tx should // have timed out and throw an appropriate exception cause. - try { txReadFuture.get().checkedGet(5, TimeUnit.SECONDS); - } catch (ReadFailedException e) { - Throwables.propagateIfInstanceOf(e.getCause(), Exception.class); - Throwables.propagate(e.getCause()); + fail("Expected NotInitializedException"); + } catch (final ReadFailedException e) { + final Throwable root = Throwables.getRootCause(e); + Throwables.throwIfUnchecked(root); + throw new RuntimeException(root); } finally { blockRecoveryLatch.countDown(); } @@ -665,7 +643,7 @@ public class DistributedDataStoreIntegrationTest { throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - String shardName = "default"; + final String shardName = "default"; // We don't want the shard to become the leader so prevent shard // elections. @@ -675,47 +653,44 @@ public class DistributedDataStoreIntegrationTest { // The ShardManager uses the election timeout for FindPrimary so // reset it low so it will timeout quickly. datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1) - .shardInitializationTimeout(200, TimeUnit.MILLISECONDS); + .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2); - try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, testName, false, shardName)) { - Object result = dataStore.getActorContext().executeOperation( + final Object result = dataStore.getActorContext().executeOperation( dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true)); assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound); // Create the write Tx. - - try (DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() - : dataStore.newReadWriteTransaction()) { + DOMStoreWriteTransaction writeTxToClose = null; + try { + writeTxToClose = writeOnly ? dataStore.newWriteOnlyTransaction() + : dataStore.newReadWriteTransaction(); + final DOMStoreWriteTransaction writeTx = writeTxToClose; assertNotNull("newReadWriteTransaction returned null", writeTx); // Do some modifications and ready the Tx on a separate // thread. - final AtomicReference txCohort = new AtomicReference<>(); final AtomicReference caughtEx = new AtomicReference<>(); final CountDownLatch txReady = new CountDownLatch(1); - Thread txThread = new Thread() { - @Override - public void run() { - try { - writeTx.write(TestModel.JUNK_PATH, - ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); - - txCohort.set(writeTx.ready()); - } catch (Exception e) { - caughtEx.set(e); - return; - } finally { - txReady.countDown(); - } + final Thread txThread = new Thread(() -> { + try { + writeTx.write(TestModel.JUNK_PATH, + ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); + + txCohort.set(writeTx.ready()); + } catch (Exception e) { + caughtEx.set(e); + } finally { + txReady.countDown(); } - }; + }); txThread.start(); // Wait for the Tx operations to complete. - boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); if (caughtEx.get() != null) { throw caughtEx.get(); @@ -727,12 +702,26 @@ public class DistributedDataStoreIntegrationTest { // leader was elected in time, the Tx // should have timed out and throw an appropriate // exception cause. - try { - txCohort.get().canCommit().get(5, TimeUnit.SECONDS); - } catch (ExecutionException e) { - Throwables.propagateIfInstanceOf(e.getCause(), Exception.class); - Throwables.propagate(e.getCause()); + txCohort.get().canCommit().get(10, TimeUnit.SECONDS); + fail("Expected NoShardLeaderException"); + } catch (final ExecutionException e) { + final String msg = "Unexpected exception: " + + Throwables.getStackTraceAsString(e.getCause()); + if (DistributedDataStore.class.equals(testParameter)) { + assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException); + } else { + assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); + } + } + } finally { + try { + if (writeTxToClose != null) { + writeTxToClose.close(); + } + } catch (Exception e) { + // FIXME TransactionProxy.close throws IllegalStateException: + // Transaction is ready, it cannot be closed } } } @@ -740,13 +729,13 @@ public class DistributedDataStoreIntegrationTest { }; } - @Test(expected = NoShardLeaderException.class) + @Test public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception { datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader"); } - @Test(expected = NoShardLeaderException.class) + @Test public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception { testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader"); } @@ -755,15 +744,15 @@ public class DistributedDataStoreIntegrationTest { public void testTransactionAbort() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (AbstractDataStore dataStore = setupDistributedDataStore("transactionAbortIntegrationTest", - "test-1")) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "transactionAbortIntegrationTest", "test-1")) { - DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); assertNotNull("newWriteOnlyTransaction returned null", writeTx); writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + final DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); cohort.canCommit().get(5, TimeUnit.SECONDS); @@ -781,49 +770,41 @@ public class DistributedDataStoreIntegrationTest { public void testTransactionChainWithSingleShard() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (AbstractDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", - "test-1")) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "testTransactionChainWithSingleShard", "test-1")) { // 1. Create a Tx chain and write-only Tx + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - - DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); assertNotNull("newWriteOnlyTransaction returned null", writeTx); // 2. Write some data - - NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + final NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); writeTx.write(TestModel.TEST_PATH, testNode); // 3. Ready the Tx for commit - final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); // 4. Commit the Tx on another thread that first waits for // the second read Tx. - final CountDownLatch continueCommit1 = new CountDownLatch(1); final CountDownLatch commit1Done = new CountDownLatch(1); final AtomicReference commit1Error = new AtomicReference<>(); - new Thread() { - @Override - public void run() { - try { - continueCommit1.await(); - doCommit(cohort1); - } catch (Exception e) { - commit1Error.set(e); - } finally { - commit1Done.countDown(); - } + new Thread(() -> { + try { + continueCommit1.await(); + doCommit(cohort1); + } catch (Exception e) { + commit1Error.set(e); + } finally { + commit1Done.countDown(); } - }.start(); + }).start(); // 5. Create a new read Tx from the chain to read and verify // the data from the first // Tx is visible after being readied. - DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction(); Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); @@ -831,9 +812,8 @@ public class DistributedDataStoreIntegrationTest { // 6. Create a new RW Tx from the chain, write more data, // and ready it - - DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); - MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(); + final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + final MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(); rwTx.write(TestModel.OUTER_LIST_PATH, outerNode); final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready(); @@ -841,7 +821,6 @@ public class DistributedDataStoreIntegrationTest { // 7. Create a new read Tx from the chain to read the data // from the last RW Tx to // verify it is visible. - readTx = txChain.newReadWriteTransaction(); optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); @@ -849,7 +828,6 @@ public class DistributedDataStoreIntegrationTest { // 8. Wait for the 2 commits to complete and close the // chain. - continueCommit1.countDown(); Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS); @@ -863,7 +841,6 @@ public class DistributedDataStoreIntegrationTest { // 9. Create a new read Tx from the data store and verify // committed data. - readTx = dataStore.newReadOnlyTransaction(); optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); @@ -877,10 +854,10 @@ public class DistributedDataStoreIntegrationTest { public void testTransactionChainWithMultipleShards() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (AbstractDataStore dataStore = setupDistributedDataStore( - "testTransactionChainWithMultipleShards", "cars-1", "people-1")) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) { - DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); assertNotNull("newWriteOnlyTransaction returned null", writeTx); @@ -893,14 +870,14 @@ public class DistributedDataStoreIntegrationTest { final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction(); + final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction(); - MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); readWriteTx.write(carPath, car); - MapEntryNode person = PeopleModel.newPersonEntry("jack"); - YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + final MapEntryNode person = PeopleModel.newPersonEntry("jack"); + final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); readWriteTx.merge(personPath, person); Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); @@ -911,16 +888,16 @@ public class DistributedDataStoreIntegrationTest { assertEquals("isPresent", true, optional.isPresent()); assertEquals("Data node", person, optional.get()); - DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); + final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); writeTx = txChain.newWriteOnlyTransaction(); writeTx.delete(carPath); - DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready(); + final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready(); - ListenableFuture canCommit1 = cohort1.canCommit(); - ListenableFuture canCommit2 = cohort2.canCommit(); + final ListenableFuture canCommit1 = cohort1.canCommit(); + final ListenableFuture canCommit2 = cohort2.canCommit(); doCommit(canCommit1, cohort1); doCommit(canCommit2, cohort2); @@ -928,7 +905,7 @@ public class DistributedDataStoreIntegrationTest { txChain.close(); - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); assertEquals("isPresent", false, optional.isPresent()); @@ -945,27 +922,27 @@ public class DistributedDataStoreIntegrationTest { public void testCreateChainedTransactionsInQuickSuccession() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (AbstractDataStore dataStore = setupDistributedDataStore( - "testCreateChainedTransactionsInQuickSuccession", "cars-1")) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) { - ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( ImmutableMap.builder() .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); - TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); DOMTransactionChain txChain = broker.createTransactionChain(listener); - List> futures = new ArrayList<>(); + final List> futures = new ArrayList<>(); - DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + final DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer()); writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); futures.add(writeTx.submit()); int numCars = 100; for (int i = 0; i < numCars; i++) { - DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + final DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); @@ -973,11 +950,11 @@ public class DistributedDataStoreIntegrationTest { futures.add(rwTx.submit()); } - for (CheckedFuture f : futures) { + for (final CheckedFuture f : futures) { f.checkedGet(); } - Optional> optional = txChain.newReadOnlyTransaction() + final Optional> optional = txChain.newReadOnlyTransaction() .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); @@ -994,18 +971,19 @@ public class DistributedDataStoreIntegrationTest { public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (AbstractDataStore dataStore = setupDistributedDataStore( - "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) { - DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction(); + final DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction(); rwTx1.ready(); - DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction(); + final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction(); - Optional> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + final Optional> optional = rwTx2.read(TestModel.TEST_PATH).get( + 5, TimeUnit.SECONDS); assertEquals("isPresent", false, optional.isPresent()); txChain.close(); @@ -1018,12 +996,12 @@ public class DistributedDataStoreIntegrationTest { public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (AbstractDataStore dataStore = setupDistributedDataStore( - "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) { final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); assertNotNull("newWriteOnlyTransaction returned null", writeTx); writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -1031,7 +1009,6 @@ public class DistributedDataStoreIntegrationTest { // Try to create another Tx of each type - each should fail // b/c the previous Tx wasn't // readied. - assertExceptionOnTxChainCreates(txChain, IllegalStateException.class); } } @@ -1042,16 +1019,14 @@ public class DistributedDataStoreIntegrationTest { public void testCreateChainedTransactionAfterClose() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (AbstractDataStore dataStore = setupDistributedDataStore( - "testCreateChainedTransactionAfterClose", "test-1")) { - - DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "testCreateChainedTransactionAfterClose", "test-1")) { + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); txChain.close(); // Try to create another Tx of each type - should fail b/c // the previous Tx was closed. - assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class); } } @@ -1062,19 +1037,17 @@ public class DistributedDataStoreIntegrationTest { public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (AbstractDataStore dataStore = setupDistributedDataStore( - "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) { final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); // Create a write tx and submit. - - DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); // Create read-only tx's and issue a read. - CheckedFuture>, ReadFailedException> readFuture1 = txChain .newReadOnlyTransaction().read(TestModel.TEST_PATH); @@ -1082,7 +1055,6 @@ public class DistributedDataStoreIntegrationTest { .newReadOnlyTransaction().read(TestModel.TEST_PATH); // Create another write tx and issue the write. - DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction(); writeTx2.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); @@ -1093,7 +1065,6 @@ public class DistributedDataStoreIntegrationTest { assertEquals("isPresent", true, readFuture2.checkedGet(5, TimeUnit.SECONDS).isPresent()); // Ensure the writes succeed. - DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready(); doCommit(cohort1); @@ -1110,33 +1081,36 @@ public class DistributedDataStoreIntegrationTest { public void testChainedTransactionFailureWithSingleShard() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (AbstractDataStore dataStore = setupDistributedDataStore( - "testChainedTransactionFailureWithSingleShard", "cars-1")) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) { - ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( ImmutableMap.builder() .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); - TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); - DOMTransactionChain txChain = broker.createTransactionChain(listener); + final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + final DOMTransactionChain txChain = broker.createTransactionChain(listener); + + final DOMDataReadWriteTransaction writeTx = txChain.newReadWriteTransaction(); - DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, + PeopleModel.emptyContainer()); - ContainerNode invalidData = ImmutableContainerNodeBuilder.create() + final ContainerNode invalidData = ImmutableContainerNodeBuilder.create() .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); - rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); try { - rwTx.submit().checkedGet(5, TimeUnit.SECONDS); + writeTx.submit().checkedGet(5, TimeUnit.SECONDS); fail("Expected TransactionCommitFailedException"); - } catch (TransactionCommitFailedException e) { + } catch (final TransactionCommitFailedException e) { // Expected } - verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), + verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class)); txChain.close(); @@ -1150,35 +1124,35 @@ public class DistributedDataStoreIntegrationTest { public void testChainedTransactionFailureWithMultipleShards() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (AbstractDataStore dataStore = setupDistributedDataStore( - "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) { - ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( ImmutableMap.builder() .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); - TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); - DOMTransactionChain txChain = broker.createTransactionChain(listener); + final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + final DOMTransactionChain txChain = broker.createTransactionChain(listener); - DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + final DOMDataReadWriteTransaction writeTx = txChain.newReadWriteTransaction(); writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - ContainerNode invalidData = ImmutableContainerNodeBuilder.create() + final ContainerNode invalidData = ImmutableContainerNodeBuilder.create() .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); + writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + // Note that merge will validate the data and fail but put // succeeds b/c deep validation is not // done for put for performance reasons. - writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); - try { writeTx.submit().checkedGet(5, TimeUnit.SECONDS); fail("Expected TransactionCommitFailedException"); - } catch (TransactionCommitFailedException e) { + } catch (final TransactionCommitFailedException e) { // Expected } @@ -1196,27 +1170,28 @@ public class DistributedDataStoreIntegrationTest { public void testChangeListenerRegistration() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (AbstractDataStore dataStore = setupDistributedDataStore("testChangeListenerRegistration", - "test-1")) { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "testChangeListenerRegistration", "test-1")) { testWriteTransaction(dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - MockDataChangeListener listener = new MockDataChangeListener(1); + final MockDataChangeListener listener = new MockDataChangeListener(1); - ListenerRegistration listenerReg = dataStore + final ListenerRegistration listenerReg = dataStore .registerChangeListener(TestModel.TEST_PATH, listener, DataChangeScope.SUBTREE); assertNotNull("registerChangeListener returned null", listenerReg); - // Wait for the initial notification + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("getDataChangeListenerActors", 1, + state.getDataChangeListenerActors().size())); + // Wait for the initial notification listener.waitForChangeEvents(TestModel.TEST_PATH); - listener.reset(2); // Write 2 updates. - testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); @@ -1226,11 +1201,13 @@ public class DistributedDataStoreIntegrationTest { ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); // Wait for the 2 updates. - listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); - listenerReg.close(); + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("getDataChangeListenerActors", 0, + state.getDataChangeListenerActors().size())); + testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(), @@ -1243,90 +1220,106 @@ public class DistributedDataStoreIntegrationTest { } @Test - public void testRestoreFromDatastoreSnapshot() throws Exception { + public void testDataTreeChangeListenerRegistration() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - final String name = "transactionIntegrationTest"; + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "testDataTreeChangeListenerRegistration", "test-1")) { - ContainerNode carsNode = CarsModel.newCarsNode( - CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)), - CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L)))); + testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); - dataTree.setSchemaContext(SchemaContextHelper.full()); - AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode); - NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); + final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); - final Snapshot carsSnapshot = Snapshot.create( - new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), - Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); + ListenerRegistration listenerReg = dataStore + .registerTreeChangeListener(TestModel.TEST_PATH, listener); - NormalizedNode peopleNode = PeopleModel.create(); - dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); - dataTree.setSchemaContext(SchemaContextHelper.full()); - AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode); - root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); + assertNotNull("registerTreeChangeListener returned null", listenerReg); - Snapshot peopleSnapshot = Snapshot.create( - new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), - Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("getTreeChangeListenerActors", 1, + state.getTreeChangeListenerActors().size())); - restoreFromSnapshot = new DatastoreSnapshot(name, null, - Arrays.asList( - new DatastoreSnapshot.ShardSnapshot("cars", - org.apache.commons.lang3.SerializationUtils.serialize(carsSnapshot)), - new DatastoreSnapshot.ShardSnapshot("people", - org.apache.commons.lang3.SerializationUtils.serialize(peopleSnapshot)))); + // Wait for the initial notification + listener.waitForChangeEvents(TestModel.TEST_PATH); + listener.reset(2); - try (AbstractDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf", - true, "cars", "people")) { + // Write 2 updates. + testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); + testWriteTransaction(dataStore, listPath, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); - Optional> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", carsNode, optional.get()); + // Wait for the 2 updates. + listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); + listenerReg.close(); - optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", peopleNode, optional.get()); + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("getTreeChangeListenerActors", 0, + state.getTreeChangeListenerActors().size())); + + testWriteTransaction(dataStore, + YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)); + + listener.expectNoMoreChanges("Received unexpected change after close"); } } }; } @Test - @Deprecated - public void testRecoveryFromPreCarbonSnapshot() throws Exception { + public void testRestoreFromDatastoreSnapshot() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - final String name = "testRecoveryFromPreCarbonSnapshot"; + final String name = "transactionIntegrationTest"; - ContainerNode carsNode = CarsModel.newCarsNode( + final ContainerNode carsNode = CarsModel.newCarsNode( CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)), CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L)))); - DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); - dataTree.setSchemaContext(SchemaContextHelper.full()); + DataTree dataTree = new InMemoryDataTreeFactory().create( + DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full()); AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode); NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); - final ByteArrayOutputStream bos = new ByteArrayOutputStream(); - new MetadataShardDataTreeSnapshot(root).serialize(bos); - final org.opendaylight.controller.cluster.raft.Snapshot snapshot = - org.opendaylight.controller.cluster.raft.Snapshot.create(bos.toByteArray(), - Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); + final Snapshot carsSnapshot = Snapshot.create( + new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), + Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); + + dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL, + SchemaContextHelper.full()); - InMemorySnapshotStore.addSnapshot("member-1-shard-cars-" + name, snapshot); + final NormalizedNode peopleNode = PeopleModel.create(); + AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode); - try (AbstractDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf", - true, "cars")) { + root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + final Snapshot peopleSnapshot = Snapshot.create( + new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), + Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); + + restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList( + new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot), + new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot))); + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, name, "module-shards-member1.conf", true, "cars", "people")) { + + final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + // two reads Optional> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); assertEquals("Data node", carsNode, optional.get()); + + optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", peopleNode, optional.get()); } } };