X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=4b58f3673b077d419c69f8e17e30cb51c59a197d;hp=5abadccd500d42745dde4c37998c2ee885a2a11e;hb=0b051b8d889d0a0945ee888ed870029f6507c9e1;hpb=66e553f2098ea61426c4a441be57395673535c2f diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 5abadccd50..4b58f3673b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -8,13 +8,19 @@ package org.opendaylight.controller.cluster.datastore; import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; @@ -24,41 +30,36 @@ import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.Cluster; +import akka.cluster.Member; import akka.dispatch.Futures; import akka.pattern.Patterns; import akka.testkit.javadsl.TestKit; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Range; -import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; -import java.math.BigInteger; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; import org.junit.After; -import org.junit.Assume; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; -import org.mockito.Mockito; import org.mockito.stubbing.Answer; import org.opendaylight.controller.cluster.access.client.RequestTimeoutException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -67,6 +68,8 @@ import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata; +import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages; +import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -76,12 +79,17 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; -import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata; +import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata; import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; +import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongBitmap; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; @@ -92,6 +100,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; import org.opendaylight.mdsal.dom.api.DOMTransactionChain; import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener; @@ -101,18 +110,21 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain; import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.common.Uint64; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.SystemMapNode; +import org.opendaylight.yangtools.yang.data.api.schema.builder.CollectionNodeBuilder; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.collection.Set; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -192,9 +204,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { leaderDistributedDataStore.close(); } - TestKit.shutdownActorSystem(leaderSystem); - TestKit.shutdownActorSystem(followerSystem); - TestKit.shutdownActorSystem(follower2System); + TestKit.shutdownActorSystem(leaderSystem, true); + TestKit.shutdownActorSystem(followerSystem, true); + TestKit.shutdownActorSystem(follower2System,true); InMemoryJournal.clear(); InMemorySnapshotStore.clear(); @@ -210,12 +222,19 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards) throws Exception { - leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder, commitTimeout); + initDatastores(type, moduleShardsConfig, shards, leaderDatastoreContextBuilder, + followerDatastoreContextBuilder); + } + + private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards, + final DatastoreContext.Builder leaderBuilder, final DatastoreContext.Builder followerBuilder) + throws Exception { + leaderTestKit = new IntegrationTestKit(leaderSystem, leaderBuilder, commitTimeout); leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore( testParameter, type, moduleShardsConfig, false, shards); - followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder, commitTimeout); + followerTestKit = new IntegrationTestKit(followerSystem, followerBuilder, commitTimeout); followerDistributedDataStore = followerTestKit.setupAbstractDataStore( testParameter, type, moduleShardsConfig, false, shards); @@ -227,12 +246,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries) throws Exception { - final Optional> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + final Optional optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); - final CollectionNodeBuilder listBuilder = ImmutableNodes.mapNodeBuilder( + final CollectionNodeBuilder listBuilder = ImmutableNodes.mapNodeBuilder( CarsModel.CAR_QNAME); - for (final NormalizedNode entry: entries) { + for (final NormalizedNode entry: entries) { listBuilder.withChild((MapEntryNode) entry); } @@ -240,16 +259,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path, - final NormalizedNode expNode) throws Exception { - final Optional> optional = readTx.read(path).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", expNode, optional.get()); + final NormalizedNode expNode) throws Exception { + assertEquals(Optional.of(expNode), readTx.read(path).get(5, TimeUnit.SECONDS)); } private static void verifyExists(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path) throws Exception { - final Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS); - assertEquals("exists", Boolean.TRUE, exists); + assertEquals("exists", Boolean.TRUE, readTx.exists(path).get(5, TimeUnit.SECONDS)); } @Test @@ -265,11 +281,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); writeTx.merge(car1Path, car1); - final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000)); + final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000)); final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage"); writeTx.merge(car2Path, car2); @@ -340,8 +356,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testSingleTransactionsWritesInQuickSuccession() throws Exception { - final String testName = "testWriteTransactionWithSingleShard"; - initDatastoresWithCars(testName); + initDatastoresWithCars("testSingleTransactionsWritesInQuickSuccession"); final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); @@ -353,58 +368,69 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { int numCars = 5; for (int i = 0; i < numCars; i++) { writeTx = txChain.newWriteOnlyTransaction(); - writeTx.write(CarsModel.newCarPath("car" + i), - CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); - + writeTx.write(CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000))); followerTestKit.doCommit(writeTx.ready()); - DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); - domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); - - domStoreReadTransaction.close(); + try (var tx = txChain.newReadOnlyTransaction()) { + tx.read(CarsModel.BASE_PATH).get(); + } } // wait to let the shard catch up with purged await("Range set leak test").atMost(5, TimeUnit.SECONDS) .pollInterval(500, TimeUnit.MILLISECONDS) .untilAsserted(() -> { - Optional localShard = - leaderDistributedDataStore.getActorUtils().findLocalShard("cars"); - FrontendShardDataTreeSnapshotMetadata frontendMetadata = - (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() - .executeOperation(localShard.get(), new RequestFrontendMetadata()); + final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars") + .orElseThrow(); + final var frontendMetadata = + (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() + .executeOperation(localShard, new RequestFrontendMetadata()); + final var clientMeta = frontendMetadata.getClients().get(0); if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { - Iterator iterator = - frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); - FrontendHistoryMetadata metadata = iterator.next(); - while (iterator.hasNext() && metadata.getHistoryId() != 1) { - metadata = iterator.next(); - } - - assertEquals(0, metadata.getClosedTransactions().size()); - assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)), - metadata.getPurgedTransactions().asRanges().iterator().next()); + assertTellClientMetadata(clientMeta, numCars * 2); } else { - // ask based should track no metadata - assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); + assertAskClientMetadata(clientMeta); } }); - final Optional> optional = txChain.newReadOnlyTransaction() - .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + try (var tx = txChain.newReadOnlyTransaction()) { + final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body(); + assertThat(body, instanceOf(Collection.class)); + assertEquals(numCars, ((Collection) body).size()); + } + } + + private void assertAskClientMetadata(final FrontendClientMetadata clientMeta) { + // ask based should track no metadata + assertEquals(List.of(), clientMeta.getCurrentHistories()); + } + + private void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) { + final var iterator = clientMeta.getCurrentHistories().iterator(); + var metadata = iterator.next(); + while (iterator.hasNext() && metadata.getHistoryId() != 1) { + metadata = iterator.next(); + } + + // FIXME: CONTROLLER-1991: remove this assumption + assumeTrue(false); + + assertEquals(UnsignedLongBitmap.of(), metadata.getClosedTransactions()); + assertEquals("[[0.." + lastPurged + "]]", metadata.getPurgedTransactions().ranges().toString()); } @Test - @Ignore("Flushes out tell based leak needs to be handled separately") public void testCloseTransactionMetadataLeak() throws Exception { - // Ask based frontend seems to have some issues with back to back close - Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class)); + // FIXME: CONTROLLER-2016: ask-based frontend triggers this: + // + // java.lang.IllegalStateException: Previous transaction + // member-2-datastore-testCloseTransactionMetadataLeak-fe-0-chn-1-txn-1-0 is not ready yet + // at org.opendaylight.controller.cluster.datastore.TransactionChainProxy$Allocated.checkReady() + // at org.opendaylight.controller.cluster.datastore.TransactionChainProxy.newReadOnlyTransaction() + assumeTrue(testParameter.isAssignableFrom(ClientBackedDataStore.class)); - final String testName = "testWriteTransactionWithSingleShard"; - initDatastoresWithCars(testName); + initDatastoresWithCars("testCloseTransactionMetadataLeak"); final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); @@ -418,10 +444,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx = txChain.newWriteOnlyTransaction(); writeTx.close(); - DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); - domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); - - domStoreReadTransaction.close(); + try (var tx = txChain.newReadOnlyTransaction()) { + tx.read(CarsModel.BASE_PATH).get(); + } } writeTx = txChain.newWriteOnlyTransaction(); @@ -433,34 +458,25 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS) .pollInterval(500, TimeUnit.MILLISECONDS) .untilAsserted(() -> { - Optional localShard = - leaderDistributedDataStore.getActorUtils().findLocalShard("cars"); - FrontendShardDataTreeSnapshotMetadata frontendMetadata = + final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars") + .orElseThrow(); + final var frontendMetadata = (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() - .executeOperation(localShard.get(), new RequestFrontendMetadata()); + .executeOperation(localShard, new RequestFrontendMetadata()); + final var clientMeta = frontendMetadata.getClients().get(0); if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { - Iterator iterator = - frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); - FrontendHistoryMetadata metadata = iterator.next(); - while (iterator.hasNext() && metadata.getHistoryId() != 1) { - metadata = iterator.next(); - } - - Set> ranges = metadata.getPurgedTransactions().asRanges(); - - assertEquals(0, metadata.getClosedTransactions().size()); - assertEquals(1, ranges.size()); + assertTellClientMetadata(clientMeta, numCars * 2 + 1); } else { - // ask based should track no metadata - assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); + assertAskClientMetadata(clientMeta); } }); - final Optional> optional = txChain.newReadOnlyTransaction() - .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + try (var tx = txChain.newReadOnlyTransaction()) { + final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body(); + assertThat(body, instanceOf(Collection.class)); + assertEquals(numCars, ((Collection) body).size()); + } } @Test @@ -473,12 +489,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); rwTx.merge(CarsModel.newCarPath("optima"), car1); verifyCars(rwTx, car1); - final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000)); + final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000)); final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage"); rwTx.merge(car2Path, car2); @@ -497,11 +513,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertNotNull("newWriteOnlyTransaction returned null", writeTx); final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH; - final NormalizedNode carsNode = CarsModel.emptyContainer(); + final NormalizedNode carsNode = CarsModel.emptyContainer(); writeTx.write(carsPath, carsNode); final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH; - final NormalizedNode peopleNode = PeopleModel.emptyContainer(); + final NormalizedNode peopleNode = PeopleModel.emptyContainer(); writeTx.write(peoplePath, peopleNode); followerTestKit.doCommit(writeTx.ready()); @@ -520,11 +536,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertNotNull("newReadWriteTransaction returned null", rwTx); final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH; - final NormalizedNode carsNode = CarsModel.emptyContainer(); + final NormalizedNode carsNode = CarsModel.emptyContainer(); rwTx.write(carsPath, carsNode); final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH; - final NormalizedNode peopleNode = PeopleModel.emptyContainer(); + final NormalizedNode peopleNode = PeopleModel.emptyContainer(); rwTx.write(peoplePath, peopleNode); followerTestKit.doCommit(rwTx.ready()); @@ -562,7 +578,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); rwTx.write(car1Path, car1); @@ -570,7 +586,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { verifyCars(rwTx, car1); - final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000)); + final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000)); rwTx.merge(CarsModel.newCarPath("sportage"), car2); rwTx.delete(car1Path); @@ -603,7 +619,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction(); - final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); readWriteTx.write(carPath, car); @@ -611,13 +627,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); readWriteTx.merge(personPath, person); - Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", car, optional.get()); - - optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", person, optional.get()); + assertEquals(Optional.of(car), readWriteTx.read(carPath).get(5, TimeUnit.SECONDS)); + assertEquals(Optional.of(person), readWriteTx.read(personPath).get(5, TimeUnit.SECONDS)); final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); @@ -635,8 +646,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); verifyCars(readTx, car); - optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertFalse("isPresent", optional.isPresent()); + assertEquals(Optional.empty(), readTx.read(personPath).get(5, TimeUnit.SECONDS)); } @Test @@ -648,7 +658,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(), MoreExecutors.directExecutor()); - final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); + final DOMTransactionChainListener listener = mock(DOMTransactionChainListener.class); final DOMTransactionChain txChain = broker.createTransactionChain(listener); final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); @@ -659,12 +669,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); - try { - writeTx.commit().get(5, TimeUnit.SECONDS); - fail("Expected TransactionCommitFailedException"); - } catch (final ExecutionException e) { - // Expected - } + final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS)) + .getCause(); + assertThat(ex, instanceOf(TransactionCommitFailedException.class)); verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class)); @@ -681,7 +688,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(), MoreExecutors.directExecutor()); - final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); + final DOMTransactionChainListener listener = mock(DOMTransactionChainListener.class); final DOMTransactionChain txChain = broker.createTransactionChain(listener); final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); @@ -696,12 +703,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // done for put for performance reasons. writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); - try { - writeTx.commit().get(5, TimeUnit.SECONDS); - fail("Expected TransactionCommitFailedException"); - } catch (final ExecutionException e) { - // Expected - } + final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS)) + .getCause(); + assertThat(ex, instanceOf(TransactionCommitFailedException.class)); verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class)); @@ -756,7 +760,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); - MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); writeTx.merge(car1Path, car1); @@ -785,7 +789,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification); new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); modification.ready(); @@ -804,7 +808,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // Send another tx without immediate commit. modification = dataTree.takeSnapshot().newModification(); - MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000)); + MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000)); new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); modification.ready(); @@ -821,11 +825,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection( ((ReadyTransactionReply)resp).getCohortPath()); - final Supplier versionSupplier = Mockito.mock(Supplier.class); - Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); - ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( - leaderDistributedDataStore.getActorUtils(), Arrays.asList( - new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2); + ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(leaderDistributedDataStore.getActorUtils(), + List.of(new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), + () -> DataStoreVersions.CURRENT_VERSION)), tx2); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); cohort.commit().get(5, TimeUnit.SECONDS); @@ -852,13 +854,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification); new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); - ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, - DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( - Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true, - Optional.empty()); + ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, DataStoreVersions.CURRENT_VERSION, + new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx1, modification), + true, Optional.empty()); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -873,13 +874,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // Send another tx without immediate commit. modification = dataTree.takeSnapshot().newModification(); - MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000)); + MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000)); new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); - forwardedReady = new ForwardedReadyTransaction(tx2, - DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( - Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false, - Optional.empty()); + forwardedReady = new ForwardedReadyTransaction(tx2, DataStoreVersions.CURRENT_VERSION, + new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx2, modification), + false, Optional.empty()); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); @@ -892,11 +892,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection( ((ReadyTransactionReply)resp).getCohortPath()); - final Supplier versionSupplier = Mockito.mock(Supplier.class); - Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); final ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( - leaderDistributedDataStore.getActorUtils(), Arrays.asList( - new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2); + leaderDistributedDataStore.getActorUtils(), List.of( + new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), + () -> DataStoreVersions.CURRENT_VERSION)), tx2); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); cohort.commit().get(5, TimeUnit.SECONDS); @@ -907,7 +906,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionForwardedToLeaderAfterRetry() throws Exception { // FIXME: remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); + assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); + followerDatastoreContextBuilder.shardBatchedModificationCount(2); leaderDatastoreContextBuilder.shardBatchedModificationCount(2); initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry"); @@ -943,10 +943,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction(); final LinkedList cars = new LinkedList<>(); int carIndex = 1; - cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); carIndex++; - NormalizedNode people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME) + NormalizedNode people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME) .withChild(PeopleModel.newPersonEntry("Dude")).build(); writeTx2.write(PeopleModel.PERSON_LIST_PATH, people); final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready(); @@ -958,7 +958,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction(); for (int i = 1; i <= 5; i++, carIndex++) { - cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); } @@ -966,7 +966,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // message on ready. final DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction(); - cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); carIndex++; @@ -974,7 +974,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // leader shard on ready. final DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction(); - cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", @@ -1019,7 +1019,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testLeadershipTransferOnShutdown() throws Exception { // FIXME: remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); + assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); + leaderDatastoreContextBuilder.shardBatchedModificationCount(1); followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null); final String testName = "testLeadershipTransferOnShutdown"; @@ -1046,7 +1047,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize())); writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); - final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); writeTx.write(CarsModel.newCarPath("optima"), car); final DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready(); @@ -1084,7 +1085,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionWithIsolatedLeader() throws Exception { // FIXME: remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); + assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); + // Set the isolated leader check interval high so we can control the switch to IsolatedLeader. leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000); final String testName = "testTransactionWithIsolatedLeader"; @@ -1117,12 +1119,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState())); - try { - leaderTestKit.doCommit(noShardLeaderWriteTx.ready()); - fail("Expected NoShardLeaderException"); - } catch (final ExecutionException e) { - assertEquals("getCause", NoShardLeaderException.class, Throwables.getRootCause(e).getClass()); - } + final var ex = assertThrows(ExecutionException.class, + () -> leaderTestKit.doCommit(noShardLeaderWriteTx.ready())); + assertEquals(NoShardLeaderException.class, Throwables.getRootCause(ex).getClass()); sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder .shardElectionTimeoutFactor(100)); @@ -1158,17 +1157,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - try { - followerTestKit.doCommit(rwTx.ready()); - fail("Exception expected"); - } catch (final ExecutionException e) { - final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); - if (DistributedDataStore.class.isAssignableFrom(testParameter)) { - assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException - || e.getCause() instanceof ShardLeaderNotRespondingException); - } else { - assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); - } + final var ex = assertThrows(ExecutionException.class, () -> followerTestKit.doCommit(rwTx.ready())); + final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause()); + if (DistributedDataStore.class.isAssignableFrom(testParameter)) { + assertTrue(msg, Throwables.getRootCause(ex) instanceof NoShardLeaderException + || ex.getCause() instanceof ShardLeaderNotRespondingException); + } else { + assertThat(msg, Throwables.getRootCause(ex), instanceOf(RequestTimeoutException.class)); } } @@ -1197,16 +1192,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - try { - followerTestKit.doCommit(rwTx.ready()); - fail("Exception expected"); - } catch (final ExecutionException e) { - final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); - if (DistributedDataStore.class.isAssignableFrom(testParameter)) { - assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException); - } else { - assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); - } + final var ex = assertThrows(ExecutionException.class, () -> followerTestKit.doCommit(rwTx.ready())); + final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause()); + if (DistributedDataStore.class.isAssignableFrom(testParameter)) { + assertThat(msg, Throwables.getRootCause(ex), instanceOf(NoShardLeaderException.class)); + } else { + assertThat(msg, Throwables.getRootCause(ex), instanceOf(RequestTimeoutException.class)); } } @@ -1250,6 +1241,68 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } } + @Test + public void testSemiReachableCandidateNotDroppingLeader() throws Exception { + final String testName = "testSemiReachableCandidateNotDroppingLeader"; + initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS); + + final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder() + .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10); + final IntegrationTestKit follower2TestKit = new IntegrationTestKit( + follower2System, follower2DatastoreContextBuilder, commitTimeout); + + final AbstractDataStore ds2 = + follower2TestKit.setupAbstractDataStore( + testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS); + + followerTestKit.waitForMembersUp("member-1", "member-3"); + follower2TestKit.waitForMembersUp("member-1", "member-2"); + + // behavior is controlled by akka.coordinated-shutdown.run-by-actor-system-terminate configuration option + TestKit.shutdownActorSystem(follower2System, true); + + ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get(); + final OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + .executeOperation(cars, GetOnDemandRaftState.INSTANCE); + + Cluster leaderCluster = Cluster.get(leaderSystem); + Cluster followerCluster = Cluster.get(followerSystem); + Cluster follower2Cluster = Cluster.get(follower2System); + + Member follower2Member = follower2Cluster.readView().self(); + + await().atMost(10, TimeUnit.SECONDS) + .until(() -> containsUnreachable(leaderCluster, follower2Member)); + await().atMost(10, TimeUnit.SECONDS) + .until(() -> containsUnreachable(followerCluster, follower2Member)); + + ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get(); + + // to simulate a follower not being able to receive messages, but still being able to send messages and becoming + // candidate, we can just send a couple of RequestVotes to both leader and follower. + cars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null); + followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null); + cars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null); + followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null); + + OnDemandRaftState stateAfter = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + .executeOperation(cars, GetOnDemandRaftState.INSTANCE); + OnDemandRaftState followerState = (OnDemandRaftState) followerDistributedDataStore.getActorUtils() + .executeOperation(cars, GetOnDemandRaftState.INSTANCE); + + assertEquals(initialState.getCurrentTerm(), stateAfter.getCurrentTerm()); + assertEquals(initialState.getCurrentTerm(), followerState.getCurrentTerm()); + + ds2.close(); + } + + private static Boolean containsUnreachable(final Cluster cluster, final Member member) { + // unreachableMembers() returns scala.collection.immutable.Set, but we are using scala.collection.Set to fix JDT + // see https://bugs.eclipse.org/bugs/show_bug.cgi?id=468276#c32 + final Set members = cluster.readView().unreachableMembers(); + return members.contains(member); + } + @Test public void testInstallSnapshot() throws Exception { final String testName = "testInstallSnapshot"; @@ -1263,10 +1316,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { SchemaContextHelper.full()); final ContainerNode carsNode = CarsModel.newCarsNode( - CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)))); + CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000)))); AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode); - final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty()); + final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty()); final Snapshot initialSnapshot = Snapshot.create( new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)), Collections.emptyList(), 5, 1, 5, 1, 1, null, null); @@ -1277,10 +1330,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { initDatastoresWithCars(testName); - final Optional> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read( - CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", readOptional.isPresent()); - assertEquals("Node", carsNode, readOptional.get()); + assertEquals(Optional.of(carsNode), leaderDistributedDataStore.newReadOnlyTransaction().read( + CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS)); verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(leaderCarShardName, Snapshot.class), initialSnapshot, snapshotRoot); @@ -1292,7 +1343,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testReadWriteMessageSlicing() throws Exception { // The slicing is only implemented for tell-based protocol - Assume.assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter)); + assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter)); leaderDatastoreContextBuilder.maximumMessageSliceSize(100); followerDatastoreContextBuilder.maximumMessageSliceSize(100); @@ -1300,14 +1351,143 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); - final NormalizedNode carsNode = CarsModel.create(); + final NormalizedNode carsNode = CarsModel.create(); rwTx.write(CarsModel.BASE_PATH, carsNode); verifyNode(rwTx, CarsModel.BASE_PATH, carsNode); } + @SuppressWarnings("IllegalCatch") + @Test + public void testRaftCallbackDuringLeadershipDrop() throws Exception { + final String testName = "testRaftCallbackDuringLeadershipDrop"; + initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS); + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + + final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, + DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500) + .shardLeaderElectionTimeoutInSeconds(3600), + commitTimeout); + + final DOMStoreWriteTransaction initialWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + leaderTestKit.doCommit(initialWriteTx.ready()); + + try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore( + testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) { + + final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + final ActorRef member2Cars = ((LocalShardStore)followerDistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + member2Cars.tell(new StartDropMessages(AppendEntries.class), null); + member3Cars.tell(new StartDropMessages(AppendEntries.class), null); + + final DOMStoreWriteTransaction newTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + newTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + final AtomicBoolean submitDone = new AtomicBoolean(false); + executor.submit(() -> { + try { + leaderTestKit.doCommit(newTx.ready()); + submitDone.set(true); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + final ActorRef leaderCars = ((LocalShardStore) leaderDistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + await().atMost(10, TimeUnit.SECONDS) + .until(() -> ((OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE)).getLastIndex() >= 1); + + final OnDemandRaftState raftState = (OnDemandRaftState)leaderDistributedDataStore.getActorUtils() + .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE); + + // Simulate a follower not receiving heartbeats but still being able to send messages ie RequestVote with + // new term(switching to candidate after election timeout) + leaderCars.tell(new RequestVote(raftState.getCurrentTerm() + 1, + "member-3-shard-cars-testRaftCallbackDuringLeadershipDrop", -1, + -1), member3Cars); + + member2Cars.tell(new StopDropMessages(AppendEntries.class), null); + member3Cars.tell(new StopDropMessages(AppendEntries.class), null); + + await("Is tx stuck in COMMIT_PENDING") + .atMost(10, TimeUnit.SECONDS).untilAtomic(submitDone, equalTo(true)); + + } + + executor.shutdownNow(); + } + + @Test + public void testSnapshotOnRootOverwrite() throws Exception { + // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate + assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); + + final String testName = "testSnapshotOnRootOverwrite"; + final String[] shards = {"cars", "default"}; + initDatastores(testName, "module-shards-default-cars-member1-and-2.conf", shards, + leaderDatastoreContextBuilder.snapshotOnRootOverwrite(true), + followerDatastoreContextBuilder.snapshotOnRootOverwrite(true)); + + leaderTestKit.waitForMembersUp("member-2"); + final ContainerNode rootNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME)) + .withChild(CarsModel.create()) + .build(); + + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); + + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", + state -> assertEquals(1, state.getSnapshotIndex())); + + IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", + state -> assertEquals(1, state.getSnapshotIndex())); + + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 1); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 1); + + for (int i = 0; i < 10; i++) { + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, CarsModel.newCarPath("car " + i), + CarsModel.newCarEntry("car " + i, Uint64.ONE)); + } + + // fake snapshot causes the snapshotIndex to move + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", + state -> assertEquals(10, state.getSnapshotIndex())); + IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", + state -> assertEquals(10, state.getSnapshotIndex())); + + // however the real snapshot still has not changed and was taken at index 1 + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 1); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 1); + + // root overwrite so expect a snapshot + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); + + // this was a real snapshot so everything should be in it(1(DisableTrackingPayload) + 1 + 10 + 1) + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", + state -> assertEquals(12, state.getSnapshotIndex())); + IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", + state -> assertEquals(12, state.getSnapshotIndex())); + + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 12); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 12); + } + + private void verifySnapshot(final String persistenceId, final long lastAppliedIndex) { + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + List snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class); + assertEquals(1, snap.size()); + assertEquals(lastAppliedIndex, snap.get(0).getLastAppliedIndex()); + } + ); + } + private static void verifySnapshot(final Snapshot actual, final Snapshot expected, - final NormalizedNode expRoot) { + final NormalizedNode expRoot) { assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm()); assertEquals("Snapshot getLastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex()); assertEquals("Snapshot getLastTerm", expected.getLastTerm(), actual.getLastTerm()); @@ -1320,10 +1500,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) { final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build()); - final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class); + final DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class); final Answer answer = invocation -> newBuilder.build(); - Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext(); - Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString()); + doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext(); + doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(anyString()); dataStore.onDatastoreContextUpdated(mockContextFactory); } }