X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=6fead6bbad2d0166d1183ba72146b45ee318139c;hp=526b1d47e11d2c2f1123f6597261392fed053691;hb=3115b8171461584e85f58d87a9f179013cfbb262;hpb=7e62b4a59f9e43bcd0585845f1aeb55c44199f27 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 526b1d47e1..6fead6bbad 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -7,12 +7,14 @@ */ package org.opendaylight.controller.cluster.datastore; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; @@ -29,6 +31,8 @@ import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; +import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; @@ -37,15 +41,18 @@ import java.math.BigInteger; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.junit.After; import org.junit.Assume; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -57,7 +64,9 @@ import org.opendaylight.controller.cluster.access.client.RequestTimeoutException import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore; import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; +import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; +import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -67,6 +76,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata; +import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; @@ -81,9 +92,9 @@ import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.common.api.TransactionChainListener; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; import org.opendaylight.mdsal.dom.api.DOMTransactionChain; +import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener; import org.opendaylight.mdsal.dom.spi.store.DOMStore; import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction; import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; @@ -117,7 +128,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Parameters(name = "{0}") public static Collection data() { return Arrays.asList(new Object[][] { - { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 12 } + { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 } }); } @@ -208,7 +219,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { followerDistributedDataStore = followerTestKit.setupAbstractDataStore( testParameter, type, moduleShardsConfig, false, shards); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards); + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), shards); leaderTestKit.waitForMembersUp("member-2"); followerTestKit.waitForMembersUp("member-1"); @@ -217,7 +228,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries) throws Exception { final Optional> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); + assertTrue("isPresent", optional.isPresent()); final CollectionNodeBuilder listBuilder = ImmutableNodes.mapNodeBuilder( CarsModel.CAR_QNAME); @@ -231,14 +242,14 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path, final NormalizedNode expNode) throws Exception { final Optional> optional = readTx.read(path).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); + assertTrue("isPresent", optional.isPresent()); assertEquals("Data node", expNode, optional.get()); } private static void verifyExists(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path) throws Exception { final Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS); - assertEquals("exists", true, exists); + assertEquals("exists", Boolean.TRUE, exists); } @Test @@ -315,8 +326,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); } - TestKit.shutdownActorSystem(leaderSystem, Boolean.TRUE); - TestKit.shutdownActorSystem(followerSystem, Boolean.TRUE); + TestKit.shutdownActorSystem(leaderSystem, true); + TestKit.shutdownActorSystem(followerSystem, true); final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2"); @@ -327,6 +338,131 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } } + @Test + public void testSingleTransactionsWritesInQuickSuccession() throws Exception { + final String testName = "testWriteTransactionWithSingleShard"; + initDatastoresWithCars(testName); + + final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + followerTestKit.doCommit(writeTx.ready()); + + int numCars = 5; + for (int i = 0; i < numCars; i++) { + writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.newCarPath("car" + i), + CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); + + followerTestKit.doCommit(writeTx.ready()); + + DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); + domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); + + domStoreReadTransaction.close(); + } + + // wait to let the shard catch up with purged + await("Range set leak test").atMost(5, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + Optional localShard = + leaderDistributedDataStore.getActorUtils().findLocalShard("cars"); + FrontendShardDataTreeSnapshotMetadata frontendMetadata = + (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() + .executeOperation(localShard.get(), new RequestFrontendMetadata()); + + if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { + Iterator iterator = + frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); + FrontendHistoryMetadata metadata = iterator.next(); + while (iterator.hasNext() && metadata.getHistoryId() != 1) { + metadata = iterator.next(); + } + + assertEquals(0, metadata.getClosedTransactions().size()); + assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)), + metadata.getPurgedTransactions().asRanges().iterator().next()); + } else { + // ask based should track no metadata + assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); + } + }); + + final Optional> optional = txChain.newReadOnlyTransaction() + .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + } + + @Test + @Ignore("Flushes out tell based leak needs to be handled separately") + public void testCloseTransactionMetadataLeak() throws Exception { + // Ask based frontend seems to have some issues with back to back close + Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class)); + + final String testName = "testWriteTransactionWithSingleShard"; + initDatastoresWithCars(testName); + + final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + followerTestKit.doCommit(writeTx.ready()); + + int numCars = 5; + for (int i = 0; i < numCars; i++) { + writeTx = txChain.newWriteOnlyTransaction(); + writeTx.close(); + + DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); + domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); + + domStoreReadTransaction.close(); + } + + writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + followerTestKit.doCommit(writeTx.ready()); + + // wait to let the shard catch up with purged + await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + Optional localShard = + leaderDistributedDataStore.getActorUtils().findLocalShard("cars"); + FrontendShardDataTreeSnapshotMetadata frontendMetadata = + (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() + .executeOperation(localShard.get(), new RequestFrontendMetadata()); + + if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { + Iterator iterator = + frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); + FrontendHistoryMetadata metadata = iterator.next(); + while (iterator.hasNext() && metadata.getHistoryId() != 1) { + metadata = iterator.next(); + } + + Set> ranges = metadata.getPurgedTransactions().asRanges(); + + assertEquals(0, metadata.getClosedTransactions().size()); + assertEquals(1, ranges.size()); + } else { + // ask based should track no metadata + assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); + } + }); + + final Optional> optional = txChain.newReadOnlyTransaction() + .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + } + @Test public void testReadWriteTransactionWithSingleShard() throws Exception { initDatastoresWithCars("testReadWriteTransactionWithSingleShard"); @@ -476,11 +612,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { readWriteTx.merge(personPath, person); Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); + assertTrue("isPresent", optional.isPresent()); assertEquals("Data node", car, optional.get()); optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); + assertTrue("isPresent", optional.isPresent()); assertEquals("Data node", person, optional.get()); final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); @@ -500,7 +636,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { verifyCars(readTx, car); optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", false, optional.isPresent()); + assertFalse("isPresent", optional.isPresent()); } @Test @@ -512,7 +648,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(), MoreExecutors.directExecutor()); - final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); final DOMTransactionChain txChain = broker.createTransactionChain(listener); final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); @@ -545,7 +681,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(), MoreExecutors.directExecutor()); - final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); final DOMTransactionChain txChain = broker.createTransactionChain(listener); final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); @@ -601,7 +737,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { TestKit.shutdownActorSystem(leaderSystem, true); Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); - followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS); + followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorUtils(), CARS); leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS); @@ -614,7 +750,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { newMember1TestKit.setupAbstractDataStore( testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) { - followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS); + followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), CARS); // Write a car entry to the new leader - should switch to local Tx @@ -634,11 +770,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testReadyLocalTransactionForwardedToLeader() throws Exception { initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader"); - followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars"); + followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars"); - final com.google.common.base.Optional carsFollowerShard = - followerDistributedDataStore.getActorContext().findLocalShard("cars"); - assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent()); + final Optional carsFollowerShard = + followerDistributedDataStore.getActorUtils().findLocalShard("cars"); + assertTrue("Cars follower shard found", carsFollowerShard.isPresent()); final DataTree dataTree = new InMemoryDataTreeFactory().create( DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full()); @@ -653,8 +789,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); modification.ready(); - ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, - java.util.Optional.empty()); + ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty()); carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -673,7 +808,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); modification.ready(); - readyLocal = new ReadyLocalTransaction(tx2 , modification, false, java.util.Optional.empty()); + readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty()); carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); @@ -683,13 +818,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertEquals("Response type", ReadyTransactionReply.class, resp.getClass()); - final ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection( + final ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection( ((ReadyTransactionReply)resp).getCohortPath()); final Supplier versionSupplier = Mockito.mock(Supplier.class); Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( - leaderDistributedDataStore.getActorContext(), Arrays.asList( + leaderDistributedDataStore.getActorUtils(), Arrays.asList( new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); @@ -702,11 +837,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testForwardedReadyTransactionForwardedToLeader() throws Exception { initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader"); - followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars"); + followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars"); - final com.google.common.base.Optional carsFollowerShard = - followerDistributedDataStore.getActorContext().findLocalShard("cars"); - assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent()); + final Optional carsFollowerShard = + followerDistributedDataStore.getActorUtils().findLocalShard("cars"); + assertTrue("Cars follower shard found", carsFollowerShard.isPresent()); carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef()); final DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class); @@ -723,7 +858,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true, - java.util.Optional.empty()); + Optional.empty()); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -744,7 +879,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { forwardedReady = new ForwardedReadyTransaction(tx2, DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false, - java.util.Optional.empty()); + Optional.empty()); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); @@ -754,13 +889,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertEquals("Response type", ReadyTransactionReply.class, resp.getClass()); - ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection( + ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection( ((ReadyTransactionReply)resp).getCohortPath()); final Supplier versionSupplier = Mockito.mock(Supplier.class); Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); final ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( - leaderDistributedDataStore.getActorContext(), Arrays.asList( + leaderDistributedDataStore.getActorUtils(), Arrays.asList( new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); @@ -771,8 +906,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionForwardedToLeaderAfterRetry() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); + // FIXME: remove when test passes also for ClientBackedDataStore + Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); followerDatastoreContextBuilder.shardBatchedModificationCount(2); leaderDatastoreContextBuilder.shardBatchedModificationCount(2); initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry"); @@ -811,7 +946,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); carIndex++; - NormalizedNode people = PeopleModel.newPersonMapNode(); + NormalizedNode people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME) + .withChild(PeopleModel.newPersonEntry("Dude")).build(); writeTx2.write(PeopleModel.PERSON_LIST_PATH, people); final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready(); @@ -842,7 +978,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", - stats -> assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount())); + stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount())); // Disable elections on the leader so it switches to follower. @@ -850,7 +986,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()) .shardElectionTimeoutFactor(10)); - leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars"); + leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorUtils(), "cars"); // Submit all tx's - the messages should get queued for retry. @@ -864,9 +1000,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder .customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1)); - IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "cars") + IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "cars") .tell(TimeoutNow.INSTANCE, ActorRef.noSender()); - IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "people") + IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "people") .tell(TimeoutNow.INSTANCE, ActorRef.noSender()); followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort); @@ -882,15 +1018,15 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testLeadershipTransferOnShutdown() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); + // FIXME: remove when test passes also for ClientBackedDataStore + Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); leaderDatastoreContextBuilder.shardBatchedModificationCount(1); followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null); final String testName = "testLeadershipTransferOnShutdown"; initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE); final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, - DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(100), + DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500), commitTimeout); try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore( testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) { @@ -923,7 +1059,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { .shardElectionTimeoutFactor(100)); final FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); - final Future future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars"); + final Future future = leaderDistributedDataStore.getActorUtils().findLocalShardAsync("cars"); final ActorRef leaderActor = Await.result(future, duration); final Future stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE); @@ -947,8 +1083,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionWithIsolatedLeader() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); + // FIXME: remove when test passes also for ClientBackedDataStore + Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); // Set the isolated leader check interval high so we can control the switch to IsolatedLeader. leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000); final String testName = "testTransactionWithIsolatedLeader"; @@ -967,9 +1103,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()); // Stop the follower - followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager()); + followerTestKit.watch(followerDistributedDataStore.getActorUtils().getShardManager()); followerDistributedDataStore.close(); - followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager()); + followerTestKit.expectTerminated(followerDistributedDataStore.getActorUtils().getShardManager()); // Submit the preIsolatedLeaderWriteTx so it's pending final DOMStoreThreePhaseCommitCohort preIsolatedLeaderTxCohort = preIsolatedLeaderWriteTx.ready(); @@ -1027,7 +1163,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { fail("Exception expected"); } catch (final ExecutionException e) { final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); - if (DistributedDataStore.class.equals(testParameter)) { + if (DistributedDataStore.class.isAssignableFrom(testParameter)) { assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException || e.getCause() instanceof ShardLeaderNotRespondingException); } else { @@ -1066,7 +1202,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { fail("Exception expected"); } catch (final ExecutionException e) { final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); - if (DistributedDataStore.class.equals(testParameter)) { + if (DistributedDataStore.class.isAssignableFrom(testParameter)) { assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException); } else { assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); @@ -1143,7 +1279,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final Optional> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read( CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, readOptional.isPresent()); + assertTrue("isPresent", readOptional.isPresent()); assertEquals("Node", carsNode, readOptional.get()); verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(leaderCarShardName, Snapshot.class), @@ -1156,7 +1292,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testReadWriteMessageSlicing() throws Exception { // The slicing is only implemented for tell-based protocol - Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class)); + Assume.assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter)); leaderDatastoreContextBuilder.maximumMessageSliceSize(100); followerDatastoreContextBuilder.maximumMessageSliceSize(100);