X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=95149a1809864545e6a163bf66450d7349b7ee11;hb=45371f6048ab259f7ed536962bd081d1eb5ae2ef;hp=c52e4a39a2bd845a1952051bd1ae774f96b00311;hpb=e345c2a17f737d537cda45b0f737dff417e3b359;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index c52e4a39a2..95149a1809 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -7,12 +7,14 @@ */ package org.opendaylight.controller.cluster.datastore; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; @@ -24,12 +26,12 @@ import akka.actor.AddressFromURIString; import akka.cluster.Cluster; import akka.dispatch.Futures; import akka.pattern.Patterns; -import akka.testkit.JavaTestKit; -import com.google.common.base.Optional; +import akka.testkit.javadsl.TestKit; import com.google.common.base.Stopwatch; -import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; +import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; @@ -38,14 +40,19 @@ import java.math.BigInteger; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import org.junit.After; import org.junit.Assume; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -53,10 +60,13 @@ import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; import org.mockito.Mockito; import org.mockito.stubbing.Answer; +import org.opendaylight.controller.cluster.access.client.RequestTimeoutException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore; import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; +import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; +import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -66,6 +76,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata; +import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; @@ -79,26 +91,24 @@ import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; -import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; -import org.opendaylight.controller.sal.core.spi.data.DOMStore; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMTransactionChain; +import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener; +import org.opendaylight.mdsal.dom.spi.store.DOMStore; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; @@ -118,7 +128,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Parameters(name = "{0}") public static Collection data() { return Arrays.asList(new Object[][] { - { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 60 } + { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 } }); } @@ -138,6 +148,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static final String MODULE_SHARDS_CARS_ONLY_1_2 = "module-shards-cars-member-1-and-2.conf"; private static final String MODULE_SHARDS_CARS_PEOPLE_1_2 = "module-shards-member1-and-2.conf"; private static final String MODULE_SHARDS_CARS_PEOPLE_1_2_3 = "module-shards-member1-and-2-and-3.conf"; + private static final String MODULE_SHARDS_CARS_1_2_3 = "module-shards-cars-member-1-and-2-and-3.conf"; private ActorSystem leaderSystem; private ActorSystem followerSystem; @@ -181,9 +192,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { leaderDistributedDataStore.close(); } - JavaTestKit.shutdownActorSystem(leaderSystem); - JavaTestKit.shutdownActorSystem(followerSystem); - JavaTestKit.shutdownActorSystem(follower2System); + TestKit.shutdownActorSystem(leaderSystem); + TestKit.shutdownActorSystem(followerSystem); + TestKit.shutdownActorSystem(follower2System); InMemoryJournal.clear(); InMemorySnapshotStore.clear(); @@ -208,7 +219,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { followerDistributedDataStore = followerTestKit.setupAbstractDataStore( testParameter, type, moduleShardsConfig, false, shards); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards); + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), shards); leaderTestKit.waitForMembersUp("member-2"); followerTestKit.waitForMembersUp("member-1"); @@ -217,7 +228,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries) throws Exception { final Optional> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); + assertTrue("isPresent", optional.isPresent()); final CollectionNodeBuilder listBuilder = ImmutableNodes.mapNodeBuilder( CarsModel.CAR_QNAME); @@ -231,14 +242,14 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path, final NormalizedNode expNode) throws Exception { final Optional> optional = readTx.read(path).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); + assertTrue("isPresent", optional.isPresent()); assertEquals("Data node", expNode, optional.get()); } private static void verifyExists(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path) throws Exception { final Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS); - assertEquals("exists", true, exists); + assertEquals("exists", Boolean.TRUE, exists); } @Test @@ -315,8 +326,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); } - JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE); - JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE); + TestKit.shutdownActorSystem(leaderSystem, true); + TestKit.shutdownActorSystem(followerSystem, true); final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2"); @@ -327,6 +338,131 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } } + @Test + public void testSingleTransactionsWritesInQuickSuccession() throws Exception { + final String testName = "testWriteTransactionWithSingleShard"; + initDatastoresWithCars(testName); + + final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + followerTestKit.doCommit(writeTx.ready()); + + int numCars = 5; + for (int i = 0; i < numCars; i++) { + writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.newCarPath("car" + i), + CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); + + followerTestKit.doCommit(writeTx.ready()); + + DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); + domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); + + domStoreReadTransaction.close(); + } + + // wait to let the shard catch up with purged + await("Range set leak test").atMost(5, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + Optional localShard = + leaderDistributedDataStore.getActorUtils().findLocalShard("cars"); + FrontendShardDataTreeSnapshotMetadata frontendMetadata = + (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() + .executeOperation(localShard.get(), new RequestFrontendMetadata()); + + if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { + Iterator iterator = + frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); + FrontendHistoryMetadata metadata = iterator.next(); + while (iterator.hasNext() && metadata.getHistoryId() != 1) { + metadata = iterator.next(); + } + + assertEquals(0, metadata.getClosedTransactions().size()); + assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)), + metadata.getPurgedTransactions().asRanges().iterator().next()); + } else { + // ask based should track no metadata + assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); + } + }); + + final Optional> optional = txChain.newReadOnlyTransaction() + .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + } + + @Test + @Ignore("Flushes out tell based leak needs to be handled separately") + public void testCloseTransactionMetadataLeak() throws Exception { + // Ask based frontend seems to have some issues with back to back close + Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class)); + + final String testName = "testWriteTransactionWithSingleShard"; + initDatastoresWithCars(testName); + + final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + followerTestKit.doCommit(writeTx.ready()); + + int numCars = 5; + for (int i = 0; i < numCars; i++) { + writeTx = txChain.newWriteOnlyTransaction(); + writeTx.close(); + + DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); + domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); + + domStoreReadTransaction.close(); + } + + writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + followerTestKit.doCommit(writeTx.ready()); + + // wait to let the shard catch up with purged + await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + Optional localShard = + leaderDistributedDataStore.getActorUtils().findLocalShard("cars"); + FrontendShardDataTreeSnapshotMetadata frontendMetadata = + (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() + .executeOperation(localShard.get(), new RequestFrontendMetadata()); + + if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { + Iterator iterator = + frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); + FrontendHistoryMetadata metadata = iterator.next(); + while (iterator.hasNext() && metadata.getHistoryId() != 1) { + metadata = iterator.next(); + } + + Set> ranges = metadata.getPurgedTransactions().asRanges(); + + assertEquals(0, metadata.getClosedTransactions().size()); + assertEquals(1, ranges.size()); + } else { + // ask based should track no metadata + assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); + } + }); + + final Optional> optional = txChain.newReadOnlyTransaction() + .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + } + @Test public void testReadWriteTransactionWithSingleShard() throws Exception { initDatastoresWithCars("testReadWriteTransactionWithSingleShard"); @@ -476,11 +612,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { readWriteTx.merge(personPath, person); Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); + assertTrue("isPresent", optional.isPresent()); assertEquals("Data node", car, optional.get()); optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); + assertTrue("isPresent", optional.isPresent()); assertEquals("Data node", person, optional.get()); final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); @@ -500,7 +636,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { verifyCars(readTx, car); optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", false, optional.isPresent()); + assertFalse("isPresent", optional.isPresent()); } @Test @@ -512,10 +648,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(), MoreExecutors.directExecutor()); - final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); final DOMTransactionChain txChain = broker.createTransactionChain(listener); - final DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) @@ -524,9 +660,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); try { - writeTx.submit().checkedGet(5, TimeUnit.SECONDS); + writeTx.commit().get(5, TimeUnit.SECONDS); fail("Expected TransactionCommitFailedException"); - } catch (final TransactionCommitFailedException e) { + } catch (final ExecutionException e) { // Expected } @@ -545,10 +681,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(), MoreExecutors.directExecutor()); - final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); final DOMTransactionChain txChain = broker.createTransactionChain(listener); - final DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); @@ -561,9 +697,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); try { - writeTx.submit().checkedGet(5, TimeUnit.SECONDS); + writeTx.commit().get(5, TimeUnit.SECONDS); fail("Expected TransactionCommitFailedException"); - } catch (final TransactionCommitFailedException e) { + } catch (final ExecutionException e) { // Expected } @@ -575,6 +711,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testSingleShardTransactionsWithLeaderChanges() throws Exception { + followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2); final String testName = "testSingleShardTransactionsWithLeaderChanges"; initDatastoresWithCars(testName); @@ -597,10 +734,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder .shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null)); - JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + TestKit.shutdownActorSystem(leaderSystem, true); Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); - followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS); + followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorUtils(), CARS); leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS); @@ -613,7 +750,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { newMember1TestKit.setupAbstractDataStore( testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) { - followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS); + followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), CARS); // Write a car entry to the new leader - should switch to local Tx @@ -633,14 +770,14 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testReadyLocalTransactionForwardedToLeader() throws Exception { initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader"); - followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars"); + followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars"); - final Optional carsFollowerShard = followerDistributedDataStore.getActorContext() - .findLocalShard("cars"); - assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent()); + final Optional carsFollowerShard = + followerDistributedDataStore.getActorUtils().findLocalShard("cars"); + assertTrue("Cars follower shard found", carsFollowerShard.isPresent()); - final TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); - dataTree.setSchemaContext(SchemaContextHelper.full()); + final DataTree dataTree = new InMemoryDataTreeFactory().create( + DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full()); // Send a tx with immediate commit. @@ -652,7 +789,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); modification.ready(); - ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true); + ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty()); carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -671,7 +808,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); modification.ready(); - readyLocal = new ReadyLocalTransaction(tx2 , modification, false); + readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty()); carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); @@ -681,13 +818,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertEquals("Response type", ReadyTransactionReply.class, resp.getClass()); - final ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection( + final ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection( ((ReadyTransactionReply)resp).getCohortPath()); final Supplier versionSupplier = Mockito.mock(Supplier.class); Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( - leaderDistributedDataStore.getActorContext(), Arrays.asList( + leaderDistributedDataStore.getActorUtils(), Arrays.asList( new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); @@ -700,11 +837,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testForwardedReadyTransactionForwardedToLeader() throws Exception { initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader"); - followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars"); + followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars"); - final Optional carsFollowerShard = followerDistributedDataStore.getActorContext() - .findLocalShard("cars"); - assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent()); + final Optional carsFollowerShard = + followerDistributedDataStore.getActorUtils().findLocalShard("cars"); + assertTrue("Cars follower shard found", carsFollowerShard.isPresent()); carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef()); final DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class); @@ -720,7 +857,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( - Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true); + Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true, + Optional.empty()); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -740,7 +878,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { forwardedReady = new ForwardedReadyTransaction(tx2, DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( - Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false); + Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false, + Optional.empty()); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); @@ -750,13 +889,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertEquals("Response type", ReadyTransactionReply.class, resp.getClass()); - ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection( + ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection( ((ReadyTransactionReply)resp).getCohortPath()); final Supplier versionSupplier = Mockito.mock(Supplier.class); Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); final ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( - leaderDistributedDataStore.getActorContext(), Arrays.asList( + leaderDistributedDataStore.getActorUtils(), Arrays.asList( new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); @@ -767,8 +906,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionForwardedToLeaderAfterRetry() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); + // FIXME: remove when test passes also for ClientBackedDataStore + Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); followerDatastoreContextBuilder.shardBatchedModificationCount(2); leaderDatastoreContextBuilder.shardBatchedModificationCount(2); initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry"); @@ -783,10 +922,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // Wait for the commit to be replicated to the follower. MemberNode.verifyRaftState(followerDistributedDataStore, "cars", - raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied())); + raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied())); MemberNode.verifyRaftState(followerDistributedDataStore, "people", - raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied())); + raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied())); // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in // the leader shard. @@ -807,7 +946,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); carIndex++; - NormalizedNode people = PeopleModel.newPersonMapNode(); + NormalizedNode people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME) + .withChild(PeopleModel.newPersonEntry("Dude")).build(); writeTx2.write(PeopleModel.PERSON_LIST_PATH, people); final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready(); @@ -838,7 +978,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", - stats -> assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount())); + stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount())); // Disable elections on the leader so it switches to follower. @@ -846,7 +986,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()) .shardElectionTimeoutFactor(10)); - leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars"); + leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorUtils(), "cars"); // Submit all tx's - the messages should get queued for retry. @@ -860,9 +1000,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder .customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1)); - IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "cars") + IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "cars") .tell(TimeoutNow.INSTANCE, ActorRef.noSender()); - IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "people") + IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "people") .tell(TimeoutNow.INSTANCE, ActorRef.noSender()); followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort); @@ -878,15 +1018,15 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testLeadershipTransferOnShutdown() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); + // FIXME: remove when test passes also for ClientBackedDataStore + Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); leaderDatastoreContextBuilder.shardBatchedModificationCount(1); followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null); final String testName = "testLeadershipTransferOnShutdown"; initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE); final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, - DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(100), + DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500), commitTimeout); try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore( testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) { @@ -919,7 +1059,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { .shardElectionTimeoutFactor(100)); final FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); - final Future future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars"); + final Future future = leaderDistributedDataStore.getActorUtils().findLocalShardAsync("cars"); final ActorRef leaderActor = Await.result(future, duration); final Future stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE); @@ -943,8 +1083,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionWithIsolatedLeader() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); + // FIXME: remove when test passes also for ClientBackedDataStore + Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); // Set the isolated leader check interval high so we can control the switch to IsolatedLeader. leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000); final String testName = "testTransactionWithIsolatedLeader"; @@ -963,9 +1103,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()); // Stop the follower - followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager()); + followerTestKit.watch(followerDistributedDataStore.getActorUtils().getShardManager()); followerDistributedDataStore.close(); - followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager()); + followerTestKit.expectTerminated(followerDistributedDataStore.getActorUtils().getShardManager()); // Submit the preIsolatedLeaderWriteTx so it's pending final DOMStoreThreePhaseCommitCohort preIsolatedLeaderTxCohort = preIsolatedLeaderWriteTx.ready(); @@ -998,17 +1138,18 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionWithShardLeaderNotResponding() throws Exception { + followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2); followerDatastoreContextBuilder.shardElectionTimeoutFactor(50); initDatastoresWithCars("testTransactionWithShardLeaderNotResponding"); // Do an initial read to get the primary shard info cached. final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); - readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS); + readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); // Shutdown the leader and try to create a new tx. - JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + TestKit.shutdownActorSystem(leaderSystem, true); followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1); sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder); @@ -1022,23 +1163,28 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { fail("Exception expected"); } catch (final ExecutionException e) { final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); - assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException - || e.getCause() instanceof ShardLeaderNotRespondingException); + if (DistributedDataStore.class.isAssignableFrom(testParameter)) { + assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException + || e.getCause() instanceof ShardLeaderNotRespondingException); + } else { + assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); + } } } @Test public void testTransactionWithCreateTxFailureDueToNoLeader() throws Exception { + followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2); initDatastoresWithCars("testTransactionWithCreateTxFailureDueToNoLeader"); // Do an initial read to get the primary shard info cached. final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); - readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS); + readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); // Shutdown the leader and try to create a new tx. - JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + TestKit.shutdownActorSystem(leaderSystem, true); Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); @@ -1055,25 +1201,29 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { followerTestKit.doCommit(rwTx.ready()); fail("Exception expected"); } catch (final ExecutionException e) { - final String msg = "Expected instance of NoShardLeaderException, actual: \n" - + Throwables.getStackTraceAsString(e.getCause()); - assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException); + final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); + if (DistributedDataStore.class.isAssignableFrom(testParameter)) { + assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException); + } else { + assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); + } } } @Test public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception { + followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2); String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx"; - initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS); + initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS); final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder() - .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); + .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10); final IntegrationTestKit follower2TestKit = new IntegrationTestKit( follower2System, follower2DatastoreContextBuilder, commitTimeout); try (AbstractDataStore ds = follower2TestKit.setupAbstractDataStore( - testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS)) { + testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS)) { followerTestKit.waitForMembersUp("member-1", "member-3"); follower2TestKit.waitForMembersUp("member-1", "member-2"); @@ -1081,16 +1231,16 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // Do an initial read to get the primary shard info cached. final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); - readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS); + readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); // Shutdown the leader and try to create a new tx. - JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + TestKit.shutdownActorSystem(leaderSystem, true); Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder - .operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null)); + .operationTimeoutInMillis(500).shardElectionTimeoutFactor(5).customRaftPolicyImplementation(null)); final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); @@ -1109,8 +1259,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // Setup a saved snapshot on the leader. The follower will startup with no data and the leader should // install a snapshot to sync the follower. - TipProducingDataTree tree = InMemoryDataTreeFactory.getInstance().create(TreeType.CONFIGURATION); - tree.setSchemaContext(SchemaContextHelper.full()); + DataTree tree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_CONFIGURATION, + SchemaContextHelper.full()); final ContainerNode carsNode = CarsModel.newCarsNode( CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)))); @@ -1128,8 +1278,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { initDatastoresWithCars(testName); final Optional> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read( - CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, readOptional.isPresent()); + CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", readOptional.isPresent()); assertEquals("Node", carsNode, readOptional.get()); verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(leaderCarShardName, Snapshot.class), @@ -1139,6 +1289,23 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { initialSnapshot, snapshotRoot); } + @Test + public void testReadWriteMessageSlicing() throws Exception { + // The slicing is only implemented for tell-based protocol + Assume.assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter)); + + leaderDatastoreContextBuilder.maximumMessageSliceSize(100); + followerDatastoreContextBuilder.maximumMessageSliceSize(100); + initDatastoresWithCars("testLargeReadReplySlicing"); + + final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); + + final NormalizedNode carsNode = CarsModel.create(); + rwTx.write(CarsModel.BASE_PATH, carsNode); + + verifyNode(rwTx, CarsModel.BASE_PATH, carsNode); + } + private static void verifySnapshot(final Snapshot actual, final Snapshot expected, final NormalizedNode expRoot) { assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());