X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=323e9d6c5a8b59f7fc29094b01d51c6ee732d5dc;hp=e403bbc20a2ccc5837121d6e502eaa72d2b53c2d;hb=7bc006db12e2d24756192309515f3d0bc65442f1;hpb=e66759266dc43d5f58b2837aca5047b42c205e4a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index e403bbc20a..323e9d6c5a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -8,13 +8,21 @@ package org.opendaylight.controller.cluster.datastore; import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; @@ -31,8 +39,6 @@ import akka.testkit.javadsl.TestKit; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Range; -import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; @@ -40,25 +46,22 @@ import com.typesafe.config.ConfigFactory; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; import org.junit.After; -import org.junit.Assume; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; -import org.mockito.Mockito; import org.mockito.stubbing.Answer; import org.opendaylight.controller.cluster.access.client.RequestTimeoutException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -67,6 +70,8 @@ import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata; +import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages; +import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -76,14 +81,16 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; -import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata; +import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata; import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; +import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongBitmap; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; @@ -95,6 +102,8 @@ import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.OptimisticLockFailedException; +import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; import org.opendaylight.mdsal.dom.api.DOMTransactionChain; import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener; @@ -108,15 +117,18 @@ import org.opendaylight.yangtools.yang.common.Uint64; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.SystemMapNode; +import org.opendaylight.yangtools.yang.data.api.schema.builder.CollectionNodeBuilder; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.collection.Set; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -132,7 +144,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Parameters(name = "{0}") public static Collection data() { return Arrays.asList(new Object[][] { - { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 } + { TestDistributedDataStore.class, 7 }, { TestClientBackedDataStore.class, 12 } }); } @@ -196,9 +208,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { leaderDistributedDataStore.close(); } - TestKit.shutdownActorSystem(leaderSystem); - TestKit.shutdownActorSystem(followerSystem); - TestKit.shutdownActorSystem(follower2System); + TestKit.shutdownActorSystem(leaderSystem, true); + TestKit.shutdownActorSystem(followerSystem, true); + TestKit.shutdownActorSystem(follower2System,true); InMemoryJournal.clear(); InMemorySnapshotStore.clear(); @@ -214,12 +226,19 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards) throws Exception { - leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder, commitTimeout); + initDatastores(type, moduleShardsConfig, shards, leaderDatastoreContextBuilder, + followerDatastoreContextBuilder); + } + + private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards, + final DatastoreContext.Builder leaderBuilder, final DatastoreContext.Builder followerBuilder) + throws Exception { + leaderTestKit = new IntegrationTestKit(leaderSystem, leaderBuilder, commitTimeout); leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore( testParameter, type, moduleShardsConfig, false, shards); - followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder, commitTimeout); + followerTestKit = new IntegrationTestKit(followerSystem, followerBuilder, commitTimeout); followerDistributedDataStore = followerTestKit.setupAbstractDataStore( testParameter, type, moduleShardsConfig, false, shards); @@ -231,12 +250,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries) throws Exception { - final Optional> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + final Optional optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); - final CollectionNodeBuilder listBuilder = ImmutableNodes.mapNodeBuilder( + final CollectionNodeBuilder listBuilder = ImmutableNodes.mapNodeBuilder( CarsModel.CAR_QNAME); - for (final NormalizedNode entry: entries) { + for (final NormalizedNode entry: entries) { listBuilder.withChild((MapEntryNode) entry); } @@ -244,16 +263,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path, - final NormalizedNode expNode) throws Exception { - final Optional> optional = readTx.read(path).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", expNode, optional.get()); + final NormalizedNode expNode) throws Exception { + assertEquals(Optional.of(expNode), readTx.read(path).get(5, TimeUnit.SECONDS)); } private static void verifyExists(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path) throws Exception { - final Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS); - assertEquals("exists", Boolean.TRUE, exists); + assertEquals("exists", Boolean.TRUE, readTx.exists(path).get(5, TimeUnit.SECONDS)); } @Test @@ -344,8 +360,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testSingleTransactionsWritesInQuickSuccession() throws Exception { - final String testName = "testWriteTransactionWithSingleShard"; - initDatastoresWithCars(testName); + initDatastoresWithCars("testSingleTransactionsWritesInQuickSuccession"); final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); @@ -357,58 +372,66 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { int numCars = 5; for (int i = 0; i < numCars; i++) { writeTx = txChain.newWriteOnlyTransaction(); - writeTx.write(CarsModel.newCarPath("car" + i), - CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000))); - + writeTx.write(CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000))); followerTestKit.doCommit(writeTx.ready()); - DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); - domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); - - domStoreReadTransaction.close(); + try (var tx = txChain.newReadOnlyTransaction()) { + tx.read(CarsModel.BASE_PATH).get(); + } } // wait to let the shard catch up with purged await("Range set leak test").atMost(5, TimeUnit.SECONDS) .pollInterval(500, TimeUnit.MILLISECONDS) .untilAsserted(() -> { - Optional localShard = - leaderDistributedDataStore.getActorUtils().findLocalShard("cars"); - FrontendShardDataTreeSnapshotMetadata frontendMetadata = - (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() - .executeOperation(localShard.get(), new RequestFrontendMetadata()); + final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars") + .orElseThrow(); + final var frontendMetadata = + (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() + .executeOperation(localShard, new RequestFrontendMetadata()); + final var clientMeta = frontendMetadata.getClients().get(0); if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { - Iterator iterator = - frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); - FrontendHistoryMetadata metadata = iterator.next(); - while (iterator.hasNext() && metadata.getHistoryId() != 1) { - metadata = iterator.next(); - } - - assertEquals(0, metadata.getClosedTransactions().size()); - assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)), - metadata.getPurgedTransactions().asRanges().iterator().next()); + assertTellClientMetadata(clientMeta, numCars * 2); } else { - // ask based should track no metadata - assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); + assertAskClientMetadata(clientMeta); } }); - final Optional> optional = txChain.newReadOnlyTransaction() - .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + try (var tx = txChain.newReadOnlyTransaction()) { + final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body(); + assertThat(body, instanceOf(Collection.class)); + assertEquals(numCars, ((Collection) body).size()); + } + } + + private static void assertAskClientMetadata(final FrontendClientMetadata clientMeta) { + // ask based should track no metadata + assertEquals(List.of(), clientMeta.getCurrentHistories()); + } + + private static void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) { + final var iterator = clientMeta.getCurrentHistories().iterator(); + var metadata = iterator.next(); + while (iterator.hasNext() && metadata.getHistoryId() != 1) { + metadata = iterator.next(); + } + + assertEquals(UnsignedLongBitmap.of(), metadata.getClosedTransactions()); + assertEquals("[[0.." + lastPurged + "]]", metadata.getPurgedTransactions().ranges().toString()); } @Test - @Ignore("Flushes out tell based leak needs to be handled separately") public void testCloseTransactionMetadataLeak() throws Exception { - // Ask based frontend seems to have some issues with back to back close - Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class)); + // FIXME: CONTROLLER-2016: ask-based frontend triggers this: + // + // java.lang.IllegalStateException: Previous transaction + // member-2-datastore-testCloseTransactionMetadataLeak-fe-0-chn-1-txn-1-0 is not ready yet + // at org.opendaylight.controller.cluster.datastore.TransactionChainProxy$Allocated.checkReady() + // at org.opendaylight.controller.cluster.datastore.TransactionChainProxy.newReadOnlyTransaction() + assumeTrue(testParameter.isAssignableFrom(ClientBackedDataStore.class)); - final String testName = "testWriteTransactionWithSingleShard"; - initDatastoresWithCars(testName); + initDatastoresWithCars("testCloseTransactionMetadataLeak"); final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); @@ -419,52 +442,32 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { int numCars = 5; for (int i = 0; i < numCars; i++) { - writeTx = txChain.newWriteOnlyTransaction(); - writeTx.close(); - - DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); - domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); + try (var tx = txChain.newWriteOnlyTransaction()) { + // Empty on purpose + } - domStoreReadTransaction.close(); + try (var tx = txChain.newReadOnlyTransaction()) { + tx.read(CarsModel.BASE_PATH).get(); + } } - writeTx = txChain.newWriteOnlyTransaction(); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - followerTestKit.doCommit(writeTx.ready()); - // wait to let the shard catch up with purged - await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS) + await("wait for purges to settle").atMost(5, TimeUnit.SECONDS) .pollInterval(500, TimeUnit.MILLISECONDS) .untilAsserted(() -> { - Optional localShard = - leaderDistributedDataStore.getActorUtils().findLocalShard("cars"); - FrontendShardDataTreeSnapshotMetadata frontendMetadata = + final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars") + .orElseThrow(); + final var frontendMetadata = (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() - .executeOperation(localShard.get(), new RequestFrontendMetadata()); + .executeOperation(localShard, new RequestFrontendMetadata()); + final var clientMeta = frontendMetadata.getClients().get(0); if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { - Iterator iterator = - frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); - FrontendHistoryMetadata metadata = iterator.next(); - while (iterator.hasNext() && metadata.getHistoryId() != 1) { - metadata = iterator.next(); - } - - Set> ranges = metadata.getPurgedTransactions().asRanges(); - - assertEquals(0, metadata.getClosedTransactions().size()); - assertEquals(1, ranges.size()); + assertTellClientMetadata(clientMeta, numCars * 2); } else { - // ask based should track no metadata - assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); + assertAskClientMetadata(clientMeta); } }); - - final Optional> optional = txChain.newReadOnlyTransaction() - .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); } @Test @@ -501,11 +504,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertNotNull("newWriteOnlyTransaction returned null", writeTx); final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH; - final NormalizedNode carsNode = CarsModel.emptyContainer(); + final NormalizedNode carsNode = CarsModel.emptyContainer(); writeTx.write(carsPath, carsNode); final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH; - final NormalizedNode peopleNode = PeopleModel.emptyContainer(); + final NormalizedNode peopleNode = PeopleModel.emptyContainer(); writeTx.write(peoplePath, peopleNode); followerTestKit.doCommit(writeTx.ready()); @@ -524,11 +527,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { assertNotNull("newReadWriteTransaction returned null", rwTx); final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH; - final NormalizedNode carsNode = CarsModel.emptyContainer(); + final NormalizedNode carsNode = CarsModel.emptyContainer(); rwTx.write(carsPath, carsNode); final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH; - final NormalizedNode peopleNode = PeopleModel.emptyContainer(); + final NormalizedNode peopleNode = PeopleModel.emptyContainer(); rwTx.write(peoplePath, peopleNode); followerTestKit.doCommit(rwTx.ready()); @@ -615,13 +618,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); readWriteTx.merge(personPath, person); - Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", car, optional.get()); - - optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", person, optional.get()); + assertEquals(Optional.of(car), readWriteTx.read(carPath).get(5, TimeUnit.SECONDS)); + assertEquals(Optional.of(person), readWriteTx.read(personPath).get(5, TimeUnit.SECONDS)); final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); @@ -639,8 +637,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); verifyCars(readTx, car); - optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertFalse("isPresent", optional.isPresent()); + assertEquals(Optional.empty(), readTx.read(personPath).get(5, TimeUnit.SECONDS)); } @Test @@ -652,7 +649,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(), MoreExecutors.directExecutor()); - final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); + final DOMTransactionChainListener listener = mock(DOMTransactionChainListener.class); final DOMTransactionChain txChain = broker.createTransactionChain(listener); final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); @@ -663,12 +660,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); - try { - writeTx.commit().get(5, TimeUnit.SECONDS); - fail("Expected TransactionCommitFailedException"); - } catch (final ExecutionException e) { - // Expected - } + final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS)) + .getCause(); + assertThat(ex, instanceOf(TransactionCommitFailedException.class)); verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class)); @@ -685,7 +679,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(), MoreExecutors.directExecutor()); - final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); + final DOMTransactionChainListener listener = mock(DOMTransactionChainListener.class); final DOMTransactionChain txChain = broker.createTransactionChain(listener); final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); @@ -700,12 +694,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // done for put for performance reasons. writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); - try { - writeTx.commit().get(5, TimeUnit.SECONDS); - fail("Expected TransactionCommitFailedException"); - } catch (final ExecutionException e) { - // Expected - } + final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS)) + .getCause(); + assertThat(ex, instanceOf(TransactionCommitFailedException.class)); verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class)); @@ -825,11 +816,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection( ((ReadyTransactionReply)resp).getCohortPath()); - final Supplier versionSupplier = Mockito.mock(Supplier.class); - Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); - ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( - leaderDistributedDataStore.getActorUtils(), Arrays.asList( - new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2); + ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(leaderDistributedDataStore.getActorUtils(), + List.of(new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), + () -> DataStoreVersions.CURRENT_VERSION)), tx2); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); cohort.commit().get(5, TimeUnit.SECONDS); @@ -859,10 +848,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); - ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, - DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( - Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true, - Optional.empty()); + ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, DataStoreVersions.CURRENT_VERSION, + new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx1, modification), + true, Optional.empty()); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -880,10 +868,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000)); new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); - forwardedReady = new ForwardedReadyTransaction(tx2, - DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( - Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false, - Optional.empty()); + forwardedReady = new ForwardedReadyTransaction(tx2, DataStoreVersions.CURRENT_VERSION, + new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx2, modification), + false, Optional.empty()); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); @@ -896,11 +883,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection( ((ReadyTransactionReply)resp).getCohortPath()); - final Supplier versionSupplier = Mockito.mock(Supplier.class); - Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); final ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( - leaderDistributedDataStore.getActorUtils(), Arrays.asList( - new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2); + leaderDistributedDataStore.getActorUtils(), List.of( + new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), + () -> DataStoreVersions.CURRENT_VERSION)), tx2); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); cohort.commit().get(5, TimeUnit.SECONDS); @@ -910,8 +896,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionForwardedToLeaderAfterRetry() throws Exception { - // FIXME: remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); followerDatastoreContextBuilder.shardBatchedModificationCount(2); leaderDatastoreContextBuilder.shardBatchedModificationCount(2); initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry"); @@ -950,7 +934,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); carIndex++; - NormalizedNode people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME) + NormalizedNode people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME) .withChild(PeopleModel.newPersonEntry("Dude")).build(); writeTx2.write(PeopleModel.PERSON_LIST_PATH, people); final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready(); @@ -981,6 +965,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); + // FIXME: CONTROLLER-2017: ClientBackedDataStore reports only 4 transactions + assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount())); @@ -1022,8 +1008,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testLeadershipTransferOnShutdown() throws Exception { - // FIXME: remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); leaderDatastoreContextBuilder.shardBatchedModificationCount(1); followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null); final String testName = "testLeadershipTransferOnShutdown"; @@ -1046,16 +1030,21 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", - stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize())); + final var usesCohorts = DistributedDataStore.class.isAssignableFrom(testParameter); + if (usesCohorts) { + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", + stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize())); + } writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); writeTx.write(CarsModel.newCarPath("optima"), car); final DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready(); - IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", - stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize())); + if (usesCohorts) { + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", + stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize())); + } // Gracefully stop the leader via a Shutdown message. @@ -1087,8 +1076,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionWithIsolatedLeader() throws Exception { - // FIXME: remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); // Set the isolated leader check interval high so we can control the switch to IsolatedLeader. leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000); final String testName = "testTransactionWithIsolatedLeader"; @@ -1121,11 +1108,23 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState())); - try { - leaderTestKit.doCommit(noShardLeaderWriteTx.ready()); - fail("Expected NoShardLeaderException"); - } catch (final ExecutionException e) { - assertEquals("getCause", NoShardLeaderException.class, Throwables.getRootCause(e).getClass()); + final var noShardLeaderCohort = noShardLeaderWriteTx.ready(); + final ListenableFuture canCommit; + + // There is difference in behavior here: + if (!leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { + // ask-based canCommit() times out and aborts + final var ex = assertThrows(ExecutionException.class, + () -> leaderTestKit.doCommit(noShardLeaderCohort)).getCause(); + assertThat(ex, instanceOf(NoShardLeaderException.class)); + assertThat(ex.getMessage(), containsString( + "Shard member-1-shard-cars-testTransactionWithIsolatedLeader currently has no leader.")); + canCommit = null; + } else { + // tell-based canCommit() does not have a real timeout and hence continues + canCommit = noShardLeaderCohort.canCommit(); + Uninterruptibles.sleepUninterruptibly(commitTimeout, TimeUnit.SECONDS); + assertFalse(canCommit.isDone()); } sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder @@ -1138,6 +1137,19 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { leaderTestKit.doCommit(preIsolatedLeaderTxCohort); leaderTestKit.doCommit(successTxCohort); + + // continuation of tell-based protocol: readied transaction will complete commit, but will report an OLFE + if (canCommit != null) { + final var ex = assertThrows(ExecutionException.class, + () -> canCommit.get(commitTimeout, TimeUnit.SECONDS)).getCause(); + assertThat(ex, instanceOf(OptimisticLockFailedException.class)); + assertEquals("Optimistic lock failed for path " + CarsModel.BASE_PATH, ex.getMessage()); + final var cause = ex.getCause(); + assertThat(cause, instanceOf(ConflictingModificationAppliedException.class)); + final var cmae = (ConflictingModificationAppliedException) cause; + assertEquals("Node was created by other transaction.", cmae.getMessage()); + assertEquals(CarsModel.BASE_PATH, cmae.getPath()); + } } @Test @@ -1162,17 +1174,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - try { - followerTestKit.doCommit(rwTx.ready()); - fail("Exception expected"); - } catch (final ExecutionException e) { - final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); - if (DistributedDataStore.class.isAssignableFrom(testParameter)) { - assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException - || e.getCause() instanceof ShardLeaderNotRespondingException); - } else { - assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); - } + final var ex = assertThrows(ExecutionException.class, () -> followerTestKit.doCommit(rwTx.ready())); + final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause()); + if (DistributedDataStore.class.isAssignableFrom(testParameter)) { + assertTrue(msg, Throwables.getRootCause(ex) instanceof NoShardLeaderException + || ex.getCause() instanceof ShardLeaderNotRespondingException); + } else { + assertThat(msg, Throwables.getRootCause(ex), instanceOf(RequestTimeoutException.class)); } } @@ -1201,16 +1209,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - try { - followerTestKit.doCommit(rwTx.ready()); - fail("Exception expected"); - } catch (final ExecutionException e) { - final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); - if (DistributedDataStore.class.isAssignableFrom(testParameter)) { - assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException); - } else { - assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); - } + final var ex = assertThrows(ExecutionException.class, () -> followerTestKit.doCommit(rwTx.ready())); + final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause()); + if (DistributedDataStore.class.isAssignableFrom(testParameter)) { + assertThat(msg, Throwables.getRootCause(ex), instanceOf(NoShardLeaderException.class)); + } else { + assertThat(msg, Throwables.getRootCause(ex), instanceOf(RequestTimeoutException.class)); } } @@ -1271,10 +1275,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { followerTestKit.waitForMembersUp("member-1", "member-3"); follower2TestKit.waitForMembersUp("member-1", "member-2"); - TestKit.shutdownActorSystem(follower2System); + // behavior is controlled by akka.coordinated-shutdown.run-by-actor-system-terminate configuration option + TestKit.shutdownActorSystem(follower2System, true); ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get(); - OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + final OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() .executeOperation(cars, GetOnDemandRaftState.INSTANCE); Cluster leaderCluster = Cluster.get(leaderSystem); @@ -1284,9 +1289,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { Member follower2Member = follower2Cluster.readView().self(); await().atMost(10, TimeUnit.SECONDS) - .until(() -> leaderCluster.readView().unreachableMembers().contains(follower2Member)); + .until(() -> containsUnreachable(leaderCluster, follower2Member)); await().atMost(10, TimeUnit.SECONDS) - .until(() -> followerCluster.readView().unreachableMembers().contains(follower2Member)); + .until(() -> containsUnreachable(followerCluster, follower2Member)); ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get(); @@ -1308,6 +1313,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { ds2.close(); } + private static Boolean containsUnreachable(final Cluster cluster, final Member member) { + // unreachableMembers() returns scala.collection.immutable.Set, but we are using scala.collection.Set to fix JDT + // see https://bugs.eclipse.org/bugs/show_bug.cgi?id=468276#c32 + final Set members = cluster.readView().unreachableMembers(); + return members.contains(member); + } + @Test public void testInstallSnapshot() throws Exception { final String testName = "testInstallSnapshot"; @@ -1324,7 +1336,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000)))); AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode); - final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty()); + final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty()); final Snapshot initialSnapshot = Snapshot.create( new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)), Collections.emptyList(), 5, 1, 5, 1, 1, null, null); @@ -1335,10 +1347,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { initDatastoresWithCars(testName); - final Optional> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read( - CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", readOptional.isPresent()); - assertEquals("Node", carsNode, readOptional.get()); + assertEquals(Optional.of(carsNode), leaderDistributedDataStore.newReadOnlyTransaction().read( + CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS)); verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(leaderCarShardName, Snapshot.class), initialSnapshot, snapshotRoot); @@ -1350,7 +1360,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testReadWriteMessageSlicing() throws Exception { // The slicing is only implemented for tell-based protocol - Assume.assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter)); + assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter)); leaderDatastoreContextBuilder.maximumMessageSliceSize(100); followerDatastoreContextBuilder.maximumMessageSliceSize(100); @@ -1358,14 +1368,142 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); - final NormalizedNode carsNode = CarsModel.create(); + final NormalizedNode carsNode = CarsModel.create(); rwTx.write(CarsModel.BASE_PATH, carsNode); verifyNode(rwTx, CarsModel.BASE_PATH, carsNode); } + @SuppressWarnings("IllegalCatch") + @Test + public void testRaftCallbackDuringLeadershipDrop() throws Exception { + final String testName = "testRaftCallbackDuringLeadershipDrop"; + initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS); + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + + final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, + DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500) + .shardLeaderElectionTimeoutInSeconds(3600), + commitTimeout); + + final DOMStoreWriteTransaction initialWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + leaderTestKit.doCommit(initialWriteTx.ready()); + + try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore( + testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) { + + final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + final ActorRef member2Cars = ((LocalShardStore)followerDistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + member2Cars.tell(new StartDropMessages(AppendEntries.class), null); + member3Cars.tell(new StartDropMessages(AppendEntries.class), null); + + final DOMStoreWriteTransaction newTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + newTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + final AtomicBoolean submitDone = new AtomicBoolean(false); + executor.submit(() -> { + try { + leaderTestKit.doCommit(newTx.ready()); + submitDone.set(true); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + final ActorRef leaderCars = ((LocalShardStore) leaderDistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + await().atMost(10, TimeUnit.SECONDS) + .until(() -> ((OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE)).getLastIndex() >= 1); + + final OnDemandRaftState raftState = (OnDemandRaftState)leaderDistributedDataStore.getActorUtils() + .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE); + + // Simulate a follower not receiving heartbeats but still being able to send messages ie RequestVote with + // new term(switching to candidate after election timeout) + leaderCars.tell(new RequestVote(raftState.getCurrentTerm() + 1, + "member-3-shard-cars-testRaftCallbackDuringLeadershipDrop", -1, + -1), member3Cars); + + member2Cars.tell(new StopDropMessages(AppendEntries.class), null); + member3Cars.tell(new StopDropMessages(AppendEntries.class), null); + + await("Is tx stuck in COMMIT_PENDING") + .atMost(10, TimeUnit.SECONDS).untilAtomic(submitDone, equalTo(true)); + + } + + executor.shutdownNow(); + } + + @Test + public void testSnapshotOnRootOverwrite() throws Exception { + initDatastores("testSnapshotOnRootOverwrite", "module-shards-default-cars-member1-and-2.conf", + new String[] {"cars", "default"}, + leaderDatastoreContextBuilder.snapshotOnRootOverwrite(true), + followerDatastoreContextBuilder.snapshotOnRootOverwrite(true)); + + leaderTestKit.waitForMembersUp("member-2"); + final ContainerNode rootNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME)) + .withChild(CarsModel.create()) + .build(); + + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); + + // FIXME: CONTROLLER-2020: ClientBackedDatastore does not have stable indexes/term, + // the snapshot index seems to fluctuate + assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", + state -> assertEquals(1, state.getSnapshotIndex())); + + IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", + state -> assertEquals(1, state.getSnapshotIndex())); + + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 1); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 1); + + for (int i = 0; i < 10; i++) { + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, CarsModel.newCarPath("car " + i), + CarsModel.newCarEntry("car " + i, Uint64.ONE)); + } + + // fake snapshot causes the snapshotIndex to move + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", + state -> assertEquals(10, state.getSnapshotIndex())); + IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", + state -> assertEquals(10, state.getSnapshotIndex())); + + // however the real snapshot still has not changed and was taken at index 1 + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 1); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 1); + + // root overwrite so expect a snapshot + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); + + // this was a real snapshot so everything should be in it(1(DisableTrackingPayload) + 1 + 10 + 1) + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", + state -> assertEquals(12, state.getSnapshotIndex())); + IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", + state -> assertEquals(12, state.getSnapshotIndex())); + + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 12); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 12); + } + + private static void verifySnapshot(final String persistenceId, final long lastAppliedIndex) { + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + List snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class); + assertEquals(1, snap.size()); + assertEquals(lastAppliedIndex, snap.get(0).getLastAppliedIndex()); + } + ); + } + private static void verifySnapshot(final Snapshot actual, final Snapshot expected, - final NormalizedNode expRoot) { + final NormalizedNode expRoot) { assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm()); assertEquals("Snapshot getLastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex()); assertEquals("Snapshot getLastTerm", expected.getLastTerm(), actual.getLastTerm()); @@ -1378,10 +1516,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) { final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build()); - final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class); + final DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class); final Answer answer = invocation -> newBuilder.build(); - Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext(); - Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString()); + doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext(); + doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(anyString()); dataStore.onDatastoreContextUpdated(mockContextFactory); } }