X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=0c74c71d832988b5959f1119da2c5e9016b98985;hp=ab9dab44e1421b9bb473f2998059847b469d2038;hb=33877f41ffc3f8eb36ad8490315419b90817d26e;hpb=126e4127773caa08cd36cd5a9d4cc137620002c2 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index ab9dab44e1..0c74c71d83 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.datastore; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -23,30 +25,38 @@ import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.Cluster; +import akka.cluster.Member; import akka.dispatch.Futures; import akka.pattern.Patterns; import akka.testkit.javadsl.TestKit; import com.google.common.base.Stopwatch; -import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; +import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; -import java.math.BigInteger; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import org.junit.After; import org.junit.Assume; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -58,7 +68,11 @@ import org.opendaylight.controller.cluster.access.client.RequestTimeoutException import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore; import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; +import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; +import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata; +import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages; +import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -68,10 +82,16 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata; +import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; @@ -91,6 +111,7 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain; import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.common.Uint64; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; @@ -118,7 +139,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Parameters(name = "{0}") public static Collection data() { return Arrays.asList(new Object[][] { - { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 12 } + { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 } }); } @@ -209,7 +230,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { followerDistributedDataStore = followerTestKit.setupAbstractDataStore( testParameter, type, moduleShardsConfig, false, shards); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards); + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), shards); leaderTestKit.waitForMembersUp("member-2"); followerTestKit.waitForMembersUp("member-1"); @@ -255,11 +276,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); writeTx.merge(car1Path, car1); - final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000)); + final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000)); final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage"); writeTx.merge(car2Path, car2); @@ -328,6 +349,131 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } } + @Test + public void testSingleTransactionsWritesInQuickSuccession() throws Exception { + final String testName = "testWriteTransactionWithSingleShard"; + initDatastoresWithCars(testName); + + final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + followerTestKit.doCommit(writeTx.ready()); + + int numCars = 5; + for (int i = 0; i < numCars; i++) { + writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.newCarPath("car" + i), + CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000))); + + followerTestKit.doCommit(writeTx.ready()); + + DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); + domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); + + domStoreReadTransaction.close(); + } + + // wait to let the shard catch up with purged + await("Range set leak test").atMost(5, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + Optional localShard = + leaderDistributedDataStore.getActorUtils().findLocalShard("cars"); + FrontendShardDataTreeSnapshotMetadata frontendMetadata = + (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() + .executeOperation(localShard.get(), new RequestFrontendMetadata()); + + if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { + Iterator iterator = + frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); + FrontendHistoryMetadata metadata = iterator.next(); + while (iterator.hasNext() && metadata.getHistoryId() != 1) { + metadata = iterator.next(); + } + + assertEquals(0, metadata.getClosedTransactions().size()); + assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)), + metadata.getPurgedTransactions().asRanges().iterator().next()); + } else { + // ask based should track no metadata + assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); + } + }); + + final Optional> optional = txChain.newReadOnlyTransaction() + .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + } + + @Test + @Ignore("Flushes out tell based leak needs to be handled separately") + public void testCloseTransactionMetadataLeak() throws Exception { + // Ask based frontend seems to have some issues with back to back close + Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class)); + + final String testName = "testWriteTransactionWithSingleShard"; + initDatastoresWithCars(testName); + + final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + followerTestKit.doCommit(writeTx.ready()); + + int numCars = 5; + for (int i = 0; i < numCars; i++) { + writeTx = txChain.newWriteOnlyTransaction(); + writeTx.close(); + + DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); + domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); + + domStoreReadTransaction.close(); + } + + writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + followerTestKit.doCommit(writeTx.ready()); + + // wait to let the shard catch up with purged + await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + Optional localShard = + leaderDistributedDataStore.getActorUtils().findLocalShard("cars"); + FrontendShardDataTreeSnapshotMetadata frontendMetadata = + (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() + .executeOperation(localShard.get(), new RequestFrontendMetadata()); + + if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { + Iterator iterator = + frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); + FrontendHistoryMetadata metadata = iterator.next(); + while (iterator.hasNext() && metadata.getHistoryId() != 1) { + metadata = iterator.next(); + } + + Set> ranges = metadata.getPurgedTransactions().asRanges(); + + assertEquals(0, metadata.getClosedTransactions().size()); + assertEquals(1, ranges.size()); + } else { + // ask based should track no metadata + assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); + } + }); + + final Optional> optional = txChain.newReadOnlyTransaction() + .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + } + @Test public void testReadWriteTransactionWithSingleShard() throws Exception { initDatastoresWithCars("testReadWriteTransactionWithSingleShard"); @@ -338,12 +484,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); rwTx.merge(CarsModel.newCarPath("optima"), car1); verifyCars(rwTx, car1); - final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000)); + final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000)); final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage"); rwTx.merge(car2Path, car2); @@ -427,7 +573,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); rwTx.write(car1Path, car1); @@ -435,7 +581,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { verifyCars(rwTx, car1); - final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000)); + final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000)); rwTx.merge(CarsModel.newCarPath("sportage"), car2); rwTx.delete(car1Path); @@ -468,7 +614,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction(); - final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); readWriteTx.write(carPath, car); @@ -602,7 +748,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { TestKit.shutdownActorSystem(leaderSystem, true); Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); - followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS); + followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorUtils(), CARS); leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS); @@ -615,13 +761,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { newMember1TestKit.setupAbstractDataStore( testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) { - followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS); + followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), CARS); // Write a car entry to the new leader - should switch to local Tx writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); - MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); writeTx.merge(car1Path, car1); @@ -635,10 +781,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testReadyLocalTransactionForwardedToLeader() throws Exception { initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader"); - followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars"); + followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars"); - final com.google.common.base.Optional carsFollowerShard = - followerDistributedDataStore.getActorContext().findLocalShard("cars"); + final Optional carsFollowerShard = + followerDistributedDataStore.getActorUtils().findLocalShard("cars"); assertTrue("Cars follower shard found", carsFollowerShard.isPresent()); final DataTree dataTree = new InMemoryDataTreeFactory().create( @@ -650,12 +796,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification); new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); modification.ready(); - ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, - java.util.Optional.empty()); + ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty()); carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -670,11 +815,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // Send another tx without immediate commit. modification = dataTree.takeSnapshot().newModification(); - MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000)); + MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000)); new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); modification.ready(); - readyLocal = new ReadyLocalTransaction(tx2 , modification, false, java.util.Optional.empty()); + readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty()); carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); @@ -684,13 +829,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertEquals("Response type", ReadyTransactionReply.class, resp.getClass()); - final ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection( + final ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection( ((ReadyTransactionReply)resp).getCohortPath()); final Supplier versionSupplier = Mockito.mock(Supplier.class); Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( - leaderDistributedDataStore.getActorContext(), Arrays.asList( + leaderDistributedDataStore.getActorUtils(), Arrays.asList( new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); @@ -703,10 +848,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testForwardedReadyTransactionForwardedToLeader() throws Exception { initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader"); - followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars"); + followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars"); - final com.google.common.base.Optional carsFollowerShard = - followerDistributedDataStore.getActorContext().findLocalShard("cars"); + final Optional carsFollowerShard = + followerDistributedDataStore.getActorUtils().findLocalShard("cars"); assertTrue("Cars follower shard found", carsFollowerShard.isPresent()); carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef()); @@ -718,13 +863,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification); new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true, - java.util.Optional.empty()); + Optional.empty()); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -739,13 +884,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // Send another tx without immediate commit. modification = dataTree.takeSnapshot().newModification(); - MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000)); + MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000)); new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); forwardedReady = new ForwardedReadyTransaction(tx2, DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false, - java.util.Optional.empty()); + Optional.empty()); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); @@ -755,13 +900,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertEquals("Response type", ReadyTransactionReply.class, resp.getClass()); - ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection( + ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection( ((ReadyTransactionReply)resp).getCohortPath()); final Supplier versionSupplier = Mockito.mock(Supplier.class); Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); final ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( - leaderDistributedDataStore.getActorContext(), Arrays.asList( + leaderDistributedDataStore.getActorUtils(), Arrays.asList( new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); @@ -772,8 +917,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionForwardedToLeaderAfterRetry() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); + // FIXME: remove when test passes also for ClientBackedDataStore + Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); followerDatastoreContextBuilder.shardBatchedModificationCount(2); leaderDatastoreContextBuilder.shardBatchedModificationCount(2); initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry"); @@ -809,7 +954,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction(); final LinkedList cars = new LinkedList<>(); int carIndex = 1; - cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); carIndex++; NormalizedNode people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME) @@ -824,7 +969,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction(); for (int i = 1; i <= 5; i++, carIndex++) { - cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); } @@ -832,7 +977,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // message on ready. final DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction(); - cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); carIndex++; @@ -840,11 +985,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // leader shard on ready. final DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction(); - cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", - stats -> assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount())); + stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount())); // Disable elections on the leader so it switches to follower. @@ -852,7 +997,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()) .shardElectionTimeoutFactor(10)); - leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars"); + leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorUtils(), "cars"); // Submit all tx's - the messages should get queued for retry. @@ -866,9 +1011,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder .customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1)); - IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "cars") + IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "cars") .tell(TimeoutNow.INSTANCE, ActorRef.noSender()); - IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "people") + IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "people") .tell(TimeoutNow.INSTANCE, ActorRef.noSender()); followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort); @@ -884,8 +1029,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testLeadershipTransferOnShutdown() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); + // FIXME: remove when test passes also for ClientBackedDataStore + Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); leaderDatastoreContextBuilder.shardBatchedModificationCount(1); followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null); final String testName = "testLeadershipTransferOnShutdown"; @@ -912,7 +1057,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize())); writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); - final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); writeTx.write(CarsModel.newCarPath("optima"), car); final DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready(); @@ -925,7 +1070,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { .shardElectionTimeoutFactor(100)); final FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); - final Future future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars"); + final Future future = leaderDistributedDataStore.getActorUtils().findLocalShardAsync("cars"); final ActorRef leaderActor = Await.result(future, duration); final Future stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE); @@ -949,8 +1094,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionWithIsolatedLeader() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); + // FIXME: remove when test passes also for ClientBackedDataStore + Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); // Set the isolated leader check interval high so we can control the switch to IsolatedLeader. leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000); final String testName = "testTransactionWithIsolatedLeader"; @@ -969,9 +1114,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()); // Stop the follower - followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager()); + followerTestKit.watch(followerDistributedDataStore.getActorUtils().getShardManager()); followerDistributedDataStore.close(); - followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager()); + followerTestKit.expectTerminated(followerDistributedDataStore.getActorUtils().getShardManager()); // Submit the preIsolatedLeaderWriteTx so it's pending final DOMStoreThreePhaseCommitCohort preIsolatedLeaderTxCohort = preIsolatedLeaderWriteTx.ready(); @@ -1029,7 +1174,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { fail("Exception expected"); } catch (final ExecutionException e) { final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); - if (DistributedDataStore.class.equals(testParameter)) { + if (DistributedDataStore.class.isAssignableFrom(testParameter)) { assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException || e.getCause() instanceof ShardLeaderNotRespondingException); } else { @@ -1068,7 +1213,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { fail("Exception expected"); } catch (final ExecutionException e) { final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); - if (DistributedDataStore.class.equals(testParameter)) { + if (DistributedDataStore.class.isAssignableFrom(testParameter)) { assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException); } else { assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); @@ -1116,6 +1261,60 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } } + @Test + public void testSemiReachableCandidateNotDroppingLeader() throws Exception { + final String testName = "testSemiReachableCandidateNotDroppingLeader"; + initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS); + + final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder() + .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10); + final IntegrationTestKit follower2TestKit = new IntegrationTestKit( + follower2System, follower2DatastoreContextBuilder, commitTimeout); + + final AbstractDataStore ds2 = + follower2TestKit.setupAbstractDataStore( + testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS); + + followerTestKit.waitForMembersUp("member-1", "member-3"); + follower2TestKit.waitForMembersUp("member-1", "member-2"); + + TestKit.shutdownActorSystem(follower2System); + + ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get(); + OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + .executeOperation(cars, GetOnDemandRaftState.INSTANCE); + + Cluster leaderCluster = Cluster.get(leaderSystem); + Cluster followerCluster = Cluster.get(followerSystem); + Cluster follower2Cluster = Cluster.get(follower2System); + + Member follower2Member = follower2Cluster.readView().self(); + + await().atMost(10, TimeUnit.SECONDS) + .until(() -> leaderCluster.readView().unreachableMembers().contains(follower2Member)); + await().atMost(10, TimeUnit.SECONDS) + .until(() -> followerCluster.readView().unreachableMembers().contains(follower2Member)); + + ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get(); + + // to simulate a follower not being able to receive messages, but still being able to send messages and becoming + // candidate, we can just send a couple of RequestVotes to both leader and follower. + cars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null); + followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null); + cars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null); + followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null); + + OnDemandRaftState stateAfter = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + .executeOperation(cars, GetOnDemandRaftState.INSTANCE); + OnDemandRaftState followerState = (OnDemandRaftState) followerDistributedDataStore.getActorUtils() + .executeOperation(cars, GetOnDemandRaftState.INSTANCE); + + assertEquals(initialState.getCurrentTerm(), stateAfter.getCurrentTerm()); + assertEquals(initialState.getCurrentTerm(), followerState.getCurrentTerm()); + + ds2.close(); + } + @Test public void testInstallSnapshot() throws Exception { final String testName = "testInstallSnapshot"; @@ -1129,10 +1328,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { SchemaContextHelper.full()); final ContainerNode carsNode = CarsModel.newCarsNode( - CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)))); + CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000)))); AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode); - final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.EMPTY); + final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty()); final Snapshot initialSnapshot = Snapshot.create( new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)), Collections.emptyList(), 5, 1, 5, 1, 1, null, null); @@ -1158,7 +1357,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testReadWriteMessageSlicing() throws Exception { // The slicing is only implemented for tell-based protocol - Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class)); + Assume.assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter)); leaderDatastoreContextBuilder.maximumMessageSliceSize(100); followerDatastoreContextBuilder.maximumMessageSliceSize(100); @@ -1172,6 +1371,70 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { verifyNode(rwTx, CarsModel.BASE_PATH, carsNode); } + @SuppressWarnings("IllegalCatch") + @Test + public void testRaftCallbackDuringLeadershipDrop() throws Exception { + final String testName = "testRaftCallbackDuringLeadershipDrop"; + initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS); + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + + final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, + DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500) + .shardLeaderElectionTimeoutInSeconds(3600), + commitTimeout); + + final DOMStoreWriteTransaction initialWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + leaderTestKit.doCommit(initialWriteTx.ready()); + + try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore( + testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) { + + final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + final ActorRef member2Cars = ((LocalShardStore)followerDistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + member2Cars.tell(new StartDropMessages(AppendEntries.class), null); + member3Cars.tell(new StartDropMessages(AppendEntries.class), null); + + final DOMStoreWriteTransaction newTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + newTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + final AtomicBoolean submitDone = new AtomicBoolean(false); + executor.submit(() -> { + try { + leaderTestKit.doCommit(newTx.ready()); + submitDone.set(true); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + final ActorRef leaderCars = ((LocalShardStore) leaderDistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + await().atMost(10, TimeUnit.SECONDS) + .until(() -> ((OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE)).getLastIndex() >= 1); + + final OnDemandRaftState raftState = (OnDemandRaftState)leaderDistributedDataStore.getActorUtils() + .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE); + + // Simulate a follower not receiving heartbeats but still being able to send messages ie RequestVote with + // new term(switching to candidate after election timeout) + leaderCars.tell(new RequestVote(raftState.getCurrentTerm() + 1, + "member-3-shard-cars-testRaftCallbackDuringLeadershipDrop", -1, + -1), member3Cars); + + member2Cars.tell(new StopDropMessages(AppendEntries.class), null); + member3Cars.tell(new StopDropMessages(AppendEntries.class), null); + + await("Is tx stuck in COMMIT_PENDING") + .atMost(10, TimeUnit.SECONDS).untilAtomic(submitDone, equalTo(true)); + + } + + executor.shutdownNow(); + } + private static void verifySnapshot(final Snapshot actual, final Snapshot expected, final NormalizedNode expRoot) { assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());