X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=da219faaf1cdd893fc67bf687145b7798849faee;hp=0b3cb0b5e6048d671b48be5704d2ac88cfc21dba;hb=e9efe27538adb5ae575f77fda90f147d46341801;hpb=9bce68c4712d00951d121be68b09578bc6e09151 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 0b3cb0b5e6..da219faaf1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -7,13 +7,15 @@ */ package org.opendaylight.controller.cluster.datastore; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; @@ -23,30 +25,36 @@ import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.Cluster; +import akka.cluster.Member; import akka.dispatch.Futures; import akka.pattern.Patterns; import akka.testkit.javadsl.TestKit; import com.google.common.base.Stopwatch; -import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; -import java.math.BigInteger; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import org.junit.After; import org.junit.Assume; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -58,7 +66,11 @@ import org.opendaylight.controller.cluster.access.client.RequestTimeoutException import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore; import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; +import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; +import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata; +import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages; +import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -68,10 +80,16 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata; +import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; @@ -82,27 +100,29 @@ import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.common.api.TransactionChainListener; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; import org.opendaylight.mdsal.dom.api.DOMTransactionChain; +import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener; import org.opendaylight.mdsal.dom.spi.store.DOMStore; import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction; import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain; import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.common.Uint64; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.SystemMapNode; +import org.opendaylight.yangtools.yang.data.api.schema.builder.CollectionNodeBuilder; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -118,7 +138,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Parameters(name = "{0}") public static Collection data() { return Arrays.asList(new Object[][] { - { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 12 } + { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 } }); } @@ -182,9 +202,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { leaderDistributedDataStore.close(); } - TestKit.shutdownActorSystem(leaderSystem); - TestKit.shutdownActorSystem(followerSystem); - TestKit.shutdownActorSystem(follower2System); + TestKit.shutdownActorSystem(leaderSystem, true); + TestKit.shutdownActorSystem(followerSystem, true); + TestKit.shutdownActorSystem(follower2System,true); InMemoryJournal.clear(); InMemorySnapshotStore.clear(); @@ -200,16 +220,23 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards) throws Exception { - leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder, commitTimeout); + initDatastores(type, moduleShardsConfig, shards, leaderDatastoreContextBuilder, + followerDatastoreContextBuilder); + } + + private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards, + final DatastoreContext.Builder leaderBuilder, final DatastoreContext.Builder followerBuilder) + throws Exception { + leaderTestKit = new IntegrationTestKit(leaderSystem, leaderBuilder, commitTimeout); leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore( testParameter, type, moduleShardsConfig, false, shards); - followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder, commitTimeout); + followerTestKit = new IntegrationTestKit(followerSystem, followerBuilder, commitTimeout); followerDistributedDataStore = followerTestKit.setupAbstractDataStore( testParameter, type, moduleShardsConfig, false, shards); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards); + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), shards); leaderTestKit.waitForMembersUp("member-2"); followerTestKit.waitForMembersUp("member-1"); @@ -217,12 +244,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries) throws Exception { - final Optional> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + final Optional optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); - final CollectionNodeBuilder listBuilder = ImmutableNodes.mapNodeBuilder( + final CollectionNodeBuilder listBuilder = ImmutableNodes.mapNodeBuilder( CarsModel.CAR_QNAME); - for (final NormalizedNode entry: entries) { + for (final NormalizedNode entry: entries) { listBuilder.withChild((MapEntryNode) entry); } @@ -230,8 +257,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path, - final NormalizedNode expNode) throws Exception { - final Optional> optional = readTx.read(path).get(5, TimeUnit.SECONDS); + final NormalizedNode expNode) throws Exception { + final Optional optional = readTx.read(path).get(5, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); assertEquals("Data node", expNode, optional.get()); } @@ -255,11 +282,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); writeTx.merge(car1Path, car1); - final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000)); + final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000)); final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage"); writeTx.merge(car2Path, car2); @@ -328,6 +355,133 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } } + @Test + public void testSingleTransactionsWritesInQuickSuccession() throws Exception { + final String testName = "testWriteTransactionWithSingleShard"; + initDatastoresWithCars(testName); + + final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + followerTestKit.doCommit(writeTx.ready()); + + int numCars = 5; + for (int i = 0; i < numCars; i++) { + writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.newCarPath("car" + i), + CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000))); + + followerTestKit.doCommit(writeTx.ready()); + + DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); + domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); + + domStoreReadTransaction.close(); + } + + // wait to let the shard catch up with purged + await("Range set leak test").atMost(5, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + Optional localShard = + leaderDistributedDataStore.getActorUtils().findLocalShard("cars"); + FrontendShardDataTreeSnapshotMetadata frontendMetadata = + (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() + .executeOperation(localShard.get(), new RequestFrontendMetadata()); + + if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { + Iterator iterator = + frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); + FrontendHistoryMetadata metadata = iterator.next(); + while (iterator.hasNext() && metadata.getHistoryId() != 1) { + metadata = iterator.next(); + } + + assertEquals(0, metadata.getClosedTransactions().size()); + + final var purgedRanges = metadata.getPurgedTransactions().ranges(); + assertEquals(1, purgedRanges.size()); + final var purgedRange = purgedRanges.first(); + assertEquals(UnsignedLong.ZERO, purgedRange.lower()); + assertEquals(UnsignedLong.valueOf(10), purgedRange.upper()); + } else { + // ask based should track no metadata + assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); + } + }); + + final Optional optional = txChain.newReadOnlyTransaction() + .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("# cars", numCars, ((Collection) optional.get().body()).size()); + } + + @Test + @Ignore("Flushes out tell based leak needs to be handled separately") + public void testCloseTransactionMetadataLeak() throws Exception { + // Ask based frontend seems to have some issues with back to back close + Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class)); + + final String testName = "testWriteTransactionWithSingleShard"; + initDatastoresWithCars(testName); + + final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + followerTestKit.doCommit(writeTx.ready()); + + int numCars = 5; + for (int i = 0; i < numCars; i++) { + writeTx = txChain.newWriteOnlyTransaction(); + writeTx.close(); + + DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); + domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); + + domStoreReadTransaction.close(); + } + + writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + followerTestKit.doCommit(writeTx.ready()); + + // wait to let the shard catch up with purged + await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + Optional localShard = + leaderDistributedDataStore.getActorUtils().findLocalShard("cars"); + FrontendShardDataTreeSnapshotMetadata frontendMetadata = + (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() + .executeOperation(localShard.get(), new RequestFrontendMetadata()); + + if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { + Iterator iterator = + frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); + FrontendHistoryMetadata metadata = iterator.next(); + while (iterator.hasNext() && metadata.getHistoryId() != 1) { + metadata = iterator.next(); + } + + assertEquals(0, metadata.getClosedTransactions().size()); + assertEquals(1, metadata.getPurgedTransactions().size()); + } else { + // ask based should track no metadata + assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); + } + }); + + final Optional optional = txChain.newReadOnlyTransaction() + .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("# cars", numCars, ((Collection) optional.get().body()).size()); + } + @Test public void testReadWriteTransactionWithSingleShard() throws Exception { initDatastoresWithCars("testReadWriteTransactionWithSingleShard"); @@ -338,12 +492,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); rwTx.merge(CarsModel.newCarPath("optima"), car1); verifyCars(rwTx, car1); - final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000)); + final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000)); final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage"); rwTx.merge(car2Path, car2); @@ -362,11 +516,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertNotNull("newWriteOnlyTransaction returned null", writeTx); final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH; - final NormalizedNode carsNode = CarsModel.emptyContainer(); + final NormalizedNode carsNode = CarsModel.emptyContainer(); writeTx.write(carsPath, carsNode); final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH; - final NormalizedNode peopleNode = PeopleModel.emptyContainer(); + final NormalizedNode peopleNode = PeopleModel.emptyContainer(); writeTx.write(peoplePath, peopleNode); followerTestKit.doCommit(writeTx.ready()); @@ -385,11 +539,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertNotNull("newReadWriteTransaction returned null", rwTx); final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH; - final NormalizedNode carsNode = CarsModel.emptyContainer(); + final NormalizedNode carsNode = CarsModel.emptyContainer(); rwTx.write(carsPath, carsNode); final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH; - final NormalizedNode peopleNode = PeopleModel.emptyContainer(); + final NormalizedNode peopleNode = PeopleModel.emptyContainer(); rwTx.write(peoplePath, peopleNode); followerTestKit.doCommit(rwTx.ready()); @@ -427,7 +581,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); rwTx.write(car1Path, car1); @@ -435,7 +589,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { verifyCars(rwTx, car1); - final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000)); + final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000)); rwTx.merge(CarsModel.newCarPath("sportage"), car2); rwTx.delete(car1Path); @@ -468,7 +622,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction(); - final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); readWriteTx.write(carPath, car); @@ -476,7 +630,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); readWriteTx.merge(personPath, person); - Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + Optional optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); assertEquals("Data node", car, optional.get()); @@ -513,7 +667,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(), MoreExecutors.directExecutor()); - final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); final DOMTransactionChain txChain = broker.createTransactionChain(listener); final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); @@ -546,7 +700,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(), MoreExecutors.directExecutor()); - final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); final DOMTransactionChain txChain = broker.createTransactionChain(listener); final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); @@ -602,7 +756,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { TestKit.shutdownActorSystem(leaderSystem, true); Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); - followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS); + followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorUtils(), CARS); leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS); @@ -615,13 +769,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { newMember1TestKit.setupAbstractDataStore( testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) { - followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS); + followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), CARS); // Write a car entry to the new leader - should switch to local Tx writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); - MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); writeTx.merge(car1Path, car1); @@ -635,10 +789,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testReadyLocalTransactionForwardedToLeader() throws Exception { initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader"); - followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars"); + followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars"); - final com.google.common.base.Optional carsFollowerShard = - followerDistributedDataStore.getActorContext().findLocalShard("cars"); + final Optional carsFollowerShard = + followerDistributedDataStore.getActorUtils().findLocalShard("cars"); assertTrue("Cars follower shard found", carsFollowerShard.isPresent()); final DataTree dataTree = new InMemoryDataTreeFactory().create( @@ -650,12 +804,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification); new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); modification.ready(); - ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, - java.util.Optional.empty()); + ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty()); carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -670,11 +823,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // Send another tx without immediate commit. modification = dataTree.takeSnapshot().newModification(); - MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000)); + MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000)); new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); modification.ready(); - readyLocal = new ReadyLocalTransaction(tx2 , modification, false, java.util.Optional.empty()); + readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty()); carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); @@ -684,13 +837,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertEquals("Response type", ReadyTransactionReply.class, resp.getClass()); - final ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection( + final ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection( ((ReadyTransactionReply)resp).getCohortPath()); final Supplier versionSupplier = Mockito.mock(Supplier.class); Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( - leaderDistributedDataStore.getActorContext(), Arrays.asList( + leaderDistributedDataStore.getActorUtils(), Arrays.asList( new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); @@ -703,10 +856,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testForwardedReadyTransactionForwardedToLeader() throws Exception { initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader"); - followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars"); + followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars"); - final com.google.common.base.Optional carsFollowerShard = - followerDistributedDataStore.getActorContext().findLocalShard("cars"); + final Optional carsFollowerShard = + followerDistributedDataStore.getActorUtils().findLocalShard("cars"); assertTrue("Cars follower shard found", carsFollowerShard.isPresent()); carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef()); @@ -718,13 +871,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification); new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true, - java.util.Optional.empty()); + Optional.empty()); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -739,13 +892,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // Send another tx without immediate commit. modification = dataTree.takeSnapshot().newModification(); - MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000)); + MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000)); new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); forwardedReady = new ForwardedReadyTransaction(tx2, DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false, - java.util.Optional.empty()); + Optional.empty()); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); @@ -755,13 +908,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertEquals("Response type", ReadyTransactionReply.class, resp.getClass()); - ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection( + ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection( ((ReadyTransactionReply)resp).getCohortPath()); final Supplier versionSupplier = Mockito.mock(Supplier.class); Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); final ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( - leaderDistributedDataStore.getActorContext(), Arrays.asList( + leaderDistributedDataStore.getActorUtils(), Arrays.asList( new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); @@ -772,8 +925,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionForwardedToLeaderAfterRetry() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); + // FIXME: remove when test passes also for ClientBackedDataStore + Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); followerDatastoreContextBuilder.shardBatchedModificationCount(2); leaderDatastoreContextBuilder.shardBatchedModificationCount(2); initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry"); @@ -809,10 +962,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction(); final LinkedList cars = new LinkedList<>(); int carIndex = 1; - cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); carIndex++; - NormalizedNode people = PeopleModel.newPersonMapNode(); + NormalizedNode people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME) + .withChild(PeopleModel.newPersonEntry("Dude")).build(); writeTx2.write(PeopleModel.PERSON_LIST_PATH, people); final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready(); @@ -823,7 +977,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction(); for (int i = 1; i <= 5; i++, carIndex++) { - cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); } @@ -831,7 +985,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // message on ready. final DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction(); - cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); carIndex++; @@ -839,11 +993,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // leader shard on ready. final DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction(); - cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", - stats -> assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount())); + stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount())); // Disable elections on the leader so it switches to follower. @@ -851,7 +1005,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()) .shardElectionTimeoutFactor(10)); - leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars"); + leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorUtils(), "cars"); // Submit all tx's - the messages should get queued for retry. @@ -865,9 +1019,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder .customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1)); - IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "cars") + IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "cars") .tell(TimeoutNow.INSTANCE, ActorRef.noSender()); - IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "people") + IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "people") .tell(TimeoutNow.INSTANCE, ActorRef.noSender()); followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort); @@ -883,15 +1037,15 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testLeadershipTransferOnShutdown() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); + // FIXME: remove when test passes also for ClientBackedDataStore + Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); leaderDatastoreContextBuilder.shardBatchedModificationCount(1); followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null); final String testName = "testLeadershipTransferOnShutdown"; initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE); final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, - DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(100), + DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500), commitTimeout); try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore( testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) { @@ -911,7 +1065,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize())); writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); - final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); writeTx.write(CarsModel.newCarPath("optima"), car); final DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready(); @@ -924,7 +1078,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { .shardElectionTimeoutFactor(100)); final FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); - final Future future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars"); + final Future future = leaderDistributedDataStore.getActorUtils().findLocalShardAsync("cars"); final ActorRef leaderActor = Await.result(future, duration); final Future stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE); @@ -948,8 +1102,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionWithIsolatedLeader() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); + // FIXME: remove when test passes also for ClientBackedDataStore + Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); // Set the isolated leader check interval high so we can control the switch to IsolatedLeader. leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000); final String testName = "testTransactionWithIsolatedLeader"; @@ -968,9 +1122,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()); // Stop the follower - followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager()); + followerTestKit.watch(followerDistributedDataStore.getActorUtils().getShardManager()); followerDistributedDataStore.close(); - followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager()); + followerTestKit.expectTerminated(followerDistributedDataStore.getActorUtils().getShardManager()); // Submit the preIsolatedLeaderWriteTx so it's pending final DOMStoreThreePhaseCommitCohort preIsolatedLeaderTxCohort = preIsolatedLeaderWriteTx.ready(); @@ -1028,7 +1182,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { fail("Exception expected"); } catch (final ExecutionException e) { final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); - if (DistributedDataStore.class.equals(testParameter)) { + if (DistributedDataStore.class.isAssignableFrom(testParameter)) { assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException || e.getCause() instanceof ShardLeaderNotRespondingException); } else { @@ -1067,7 +1221,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { fail("Exception expected"); } catch (final ExecutionException e) { final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); - if (DistributedDataStore.class.equals(testParameter)) { + if (DistributedDataStore.class.isAssignableFrom(testParameter)) { assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException); } else { assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); @@ -1115,6 +1269,61 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } } + @Test + public void testSemiReachableCandidateNotDroppingLeader() throws Exception { + final String testName = "testSemiReachableCandidateNotDroppingLeader"; + initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS); + + final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder() + .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10); + final IntegrationTestKit follower2TestKit = new IntegrationTestKit( + follower2System, follower2DatastoreContextBuilder, commitTimeout); + + final AbstractDataStore ds2 = + follower2TestKit.setupAbstractDataStore( + testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS); + + followerTestKit.waitForMembersUp("member-1", "member-3"); + follower2TestKit.waitForMembersUp("member-1", "member-2"); + + // behavior is controlled by akka.coordinated-shutdown.run-by-actor-system-terminate configuration option + TestKit.shutdownActorSystem(follower2System, true); + + ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get(); + final OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + .executeOperation(cars, GetOnDemandRaftState.INSTANCE); + + Cluster leaderCluster = Cluster.get(leaderSystem); + Cluster followerCluster = Cluster.get(followerSystem); + Cluster follower2Cluster = Cluster.get(follower2System); + + Member follower2Member = follower2Cluster.readView().self(); + + await().atMost(10, TimeUnit.SECONDS) + .until(() -> leaderCluster.readView().unreachableMembers().contains(follower2Member)); + await().atMost(10, TimeUnit.SECONDS) + .until(() -> followerCluster.readView().unreachableMembers().contains(follower2Member)); + + ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get(); + + // to simulate a follower not being able to receive messages, but still being able to send messages and becoming + // candidate, we can just send a couple of RequestVotes to both leader and follower. + cars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null); + followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null); + cars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null); + followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null); + + OnDemandRaftState stateAfter = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + .executeOperation(cars, GetOnDemandRaftState.INSTANCE); + OnDemandRaftState followerState = (OnDemandRaftState) followerDistributedDataStore.getActorUtils() + .executeOperation(cars, GetOnDemandRaftState.INSTANCE); + + assertEquals(initialState.getCurrentTerm(), stateAfter.getCurrentTerm()); + assertEquals(initialState.getCurrentTerm(), followerState.getCurrentTerm()); + + ds2.close(); + } + @Test public void testInstallSnapshot() throws Exception { final String testName = "testInstallSnapshot"; @@ -1128,10 +1337,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { SchemaContextHelper.full()); final ContainerNode carsNode = CarsModel.newCarsNode( - CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)))); + CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000)))); AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode); - final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.EMPTY); + final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty()); final Snapshot initialSnapshot = Snapshot.create( new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)), Collections.emptyList(), 5, 1, 5, 1, 1, null, null); @@ -1142,7 +1351,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { initDatastoresWithCars(testName); - final Optional> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read( + final Optional readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read( CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); assertTrue("isPresent", readOptional.isPresent()); assertEquals("Node", carsNode, readOptional.get()); @@ -1157,7 +1366,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testReadWriteMessageSlicing() throws Exception { // The slicing is only implemented for tell-based protocol - Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class)); + Assume.assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter)); leaderDatastoreContextBuilder.maximumMessageSliceSize(100); followerDatastoreContextBuilder.maximumMessageSliceSize(100); @@ -1165,14 +1374,145 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); - final NormalizedNode carsNode = CarsModel.create(); + final NormalizedNode carsNode = CarsModel.create(); rwTx.write(CarsModel.BASE_PATH, carsNode); verifyNode(rwTx, CarsModel.BASE_PATH, carsNode); } + @SuppressWarnings("IllegalCatch") + @Test + public void testRaftCallbackDuringLeadershipDrop() throws Exception { + final String testName = "testRaftCallbackDuringLeadershipDrop"; + initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS); + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + + final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, + DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500) + .shardLeaderElectionTimeoutInSeconds(3600), + commitTimeout); + + final DOMStoreWriteTransaction initialWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + leaderTestKit.doCommit(initialWriteTx.ready()); + + try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore( + testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) { + + final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + final ActorRef member2Cars = ((LocalShardStore)followerDistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + member2Cars.tell(new StartDropMessages(AppendEntries.class), null); + member3Cars.tell(new StartDropMessages(AppendEntries.class), null); + + final DOMStoreWriteTransaction newTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + newTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + final AtomicBoolean submitDone = new AtomicBoolean(false); + executor.submit(() -> { + try { + leaderTestKit.doCommit(newTx.ready()); + submitDone.set(true); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + final ActorRef leaderCars = ((LocalShardStore) leaderDistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + await().atMost(10, TimeUnit.SECONDS) + .until(() -> ((OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE)).getLastIndex() >= 1); + + final OnDemandRaftState raftState = (OnDemandRaftState)leaderDistributedDataStore.getActorUtils() + .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE); + + // Simulate a follower not receiving heartbeats but still being able to send messages ie RequestVote with + // new term(switching to candidate after election timeout) + leaderCars.tell(new RequestVote(raftState.getCurrentTerm() + 1, + "member-3-shard-cars-testRaftCallbackDuringLeadershipDrop", -1, + -1), member3Cars); + + member2Cars.tell(new StopDropMessages(AppendEntries.class), null); + member3Cars.tell(new StopDropMessages(AppendEntries.class), null); + + await("Is tx stuck in COMMIT_PENDING") + .atMost(10, TimeUnit.SECONDS).untilAtomic(submitDone, equalTo(true)); + + } + + executor.shutdownNow(); + } + + @Test + public void testSnapshotOnRootOverwrite() throws Exception { + if (!DistributedDataStore.class.isAssignableFrom(testParameter)) { + // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate + return; + } + + final String testName = "testSnapshotOnRootOverwrite"; + final String[] shards = {"cars", "default"}; + initDatastores(testName, "module-shards-default-cars-member1-and-2.conf", shards, + leaderDatastoreContextBuilder.snapshotOnRootOverwrite(true), + followerDatastoreContextBuilder.snapshotOnRootOverwrite(true)); + + leaderTestKit.waitForMembersUp("member-2"); + final ContainerNode rootNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME)) + .withChild(CarsModel.create()) + .build(); + + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); + + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", + state -> assertEquals(1, state.getSnapshotIndex())); + + IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", + state -> assertEquals(1, state.getSnapshotIndex())); + + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 1); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 1); + + for (int i = 0; i < 10; i++) { + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, CarsModel.newCarPath("car " + i), + CarsModel.newCarEntry("car " + i, Uint64.ONE)); + } + + // fake snapshot causes the snapshotIndex to move + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", + state -> assertEquals(10, state.getSnapshotIndex())); + IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", + state -> assertEquals(10, state.getSnapshotIndex())); + + // however the real snapshot still has not changed and was taken at index 1 + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 1); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 1); + + // root overwrite so expect a snapshot + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); + + // this was a real snapshot so everything should be in it(1(DisableTrackingPayload) + 1 + 10 + 1) + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", + state -> assertEquals(12, state.getSnapshotIndex())); + IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", + state -> assertEquals(12, state.getSnapshotIndex())); + + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 12); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 12); + } + + private void verifySnapshot(final String persistenceId, final long lastAppliedIndex) { + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + List snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class); + assertEquals(1, snap.size()); + assertEquals(lastAppliedIndex, snap.get(0).getLastAppliedIndex()); + } + ); + } + private static void verifySnapshot(final Snapshot actual, final Snapshot expected, - final NormalizedNode expRoot) { + final NormalizedNode expRoot) { assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm()); assertEquals("Snapshot getLastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex()); assertEquals("Snapshot getLastTerm", expected.getLastTerm(), actual.getLastTerm());