From ba5f7cb9866163cffbae48d50355bc48cb3d0604 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 3 Apr 2023 20:14:47 +0200 Subject: [PATCH] Honor shard-batched-modification-count RemoteProxyTransaction should honor not use a hard-coded limit, but rather pick it up from DatastoreContext configuration. JIRA: CONTROLLER-2075 Change-Id: I5cf6491f76c05582b2c59e918d8af9521c01d98f Signed-off-by: Robert Varga --- .../actors/dds/RemoteProxyTransaction.java | 7 ++-- .../actors/dds/AbstractClientHistoryTest.java | 8 ++++- .../AbstractDataStoreClientBehaviorTest.java | 10 ++++-- .../dds/AbstractProxyTransactionTest.java | 20 +++++++++++ .../ClientTransactionCommitCohortTest.java | 17 +++++++-- .../DirectTransactionCommitCohortTest.java | 13 ++++++- .../dds/RemoteProxyTransactionTest.java | 2 +- ...butedDataStoreRemotingIntegrationTest.java | 35 +++++++++---------- 8 files changed, 82 insertions(+), 30 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 0f75c747f0..6288a2f8ba 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -70,12 +70,10 @@ import org.slf4j.LoggerFactory; final class RemoteProxyTransaction extends AbstractProxyTransaction { private static final Logger LOG = LoggerFactory.getLogger(RemoteProxyTransaction.class); - // FIXME: make this tuneable - private static final int REQUEST_MAX_MODIFICATIONS = 1000; - private final ModifyTransactionRequestBuilder builder; private final boolean sendReadyOnSeal; private final boolean snapshotOnly; + private final int maxModifications; private boolean builderBusy; @@ -87,6 +85,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { this.snapshotOnly = snapshotOnly; this.sendReadyOnSeal = sendReadyOnSeal; builder = new ModifyTransactionRequestBuilder(identifier, localActor()); + maxModifications = parent.parent().actorUtils().getDatastoreContext().getShardBatchedModificationCount(); } @Override @@ -184,7 +183,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { ensureInitializedBuilder(); builder.addModification(modification); - if (builder.size() >= REQUEST_MAX_MODIFICATIONS) { + if (builder.size() >= maxModifications) { flushBuilder(enqueuedTicks); } } else { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistoryTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistoryTest.java index 05531cddd9..3d9b3fd336 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistoryTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistoryTest.java @@ -28,6 +28,7 @@ import org.opendaylight.controller.cluster.access.client.AccessClientUtil; import org.opendaylight.controller.cluster.access.client.ClientActorContext; import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -42,6 +43,8 @@ public abstract class AbstractClientHistoryTest @Mock private DataTree tree; + @Mock + private DatastoreContext datastoreContext; protected abstract T object(); @@ -176,13 +179,16 @@ public abstract class AbstractClientHistoryTest assertNull(reconnectCohort); } - protected static ActorUtils createActorUtilsMock(final ActorSystem system, final ActorRef actor) { + protected final ActorUtils createActorUtilsMock(final ActorSystem system, final ActorRef actor) { final ActorUtils mock = mock(ActorUtils.class); final Promise promise = new DefaultPromise<>(); final ActorSelection selection = system.actorSelection(actor.path()); final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0); promise.success(shardInfo); doReturn(promise.future()).when(mock).findPrimaryShardAsync(any()); + doReturn(1000).when(datastoreContext).getShardBatchedModificationCount(); + doReturn(datastoreContext).when(mock).getDatastoreContext(); + return mock; } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehaviorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehaviorTest.java index 14a29c5101..ed7bdaca14 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehaviorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehaviorTest.java @@ -32,6 +32,7 @@ import org.opendaylight.controller.cluster.access.client.ClientActorContext; import org.opendaylight.controller.cluster.access.client.InternalCommand; import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -50,16 +51,17 @@ public abstract class AbstractDataStoreClientBehaviorTest { private TestProbe clientActorProbe; private TestProbe actorContextProbe; private AbstractDataStoreClientBehavior behavior; + private ActorUtils util; @Before public void setUp() { system = ActorSystem.apply(); clientActorProbe = new TestProbe(system, "client"); actorContextProbe = new TestProbe(system, "actor-context"); - final ActorUtils context = createActorContextMock(system, actorContextProbe.ref()); + util = createActorContextMock(system, actorContextProbe.ref()); clientContext = AccessClientUtil.createClientActorContext(system, clientActorProbe.ref(), CLIENT_ID, PERSISTENCE_ID); - behavior = createBehavior(clientContext, context); + behavior = createBehavior(clientContext, util); } @SuppressWarnings("checkstyle:hiddenField") @@ -132,6 +134,10 @@ public abstract class AbstractDataStoreClientBehaviorTest { @Test public void testGetConnection() { + final var datastoreContext = mock(DatastoreContext.class); + doReturn(1000).when(datastoreContext).getShardBatchedModificationCount(); + doReturn(datastoreContext).when(util).getDatastoreContext(); + //set up data tree mock final CursorAwareDataTreeModification modification = mock(CursorAwareDataTreeModification.class); doReturn(Optional.empty()).when(modification).readNode(YangInstanceIdentifier.empty()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java index 08a7011294..353123517f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java @@ -14,6 +14,7 @@ import static org.hamcrest.core.Is.isA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -60,6 +61,8 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.yangtools.yang.common.Empty; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -95,6 +98,11 @@ public abstract class AbstractProxyTransactionTest connection = AccessClientUtil.createConnectedConnection(context, 0L, backend); + final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID); transaction = createTransaction(parent, TestUtils.TRANSACTION_ID, snapshot); tester = new TransactionTester<>(transaction, connection, backendProbe); } + protected final void mockForRemote() { + doReturn(1000).when(datastoreContext).getShardBatchedModificationCount(); + doReturn(datastoreContext).when(actorUtils).getDatastoreContext(); + doReturn(actorUtils).when(history).actorUtils(); + } + @SuppressWarnings("checkstyle:hiddenField") protected abstract T createTransaction(ProxyHistory parent, TransactionIdentifier id, DataTreeSnapshot snapshot); @@ -322,6 +337,10 @@ public abstract class AbstractProxyTransactionTest connection = AccessClientUtil.createConnectedConnection(context, 0L, backend); final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID); + final RemoteProxyTransaction transaction = new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false, false); return new TransactionTester<>(transaction, connection, backendProbe); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohortTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohortTest.java index a123b9e509..e54b275c95 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohortTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohortTest.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doReturn; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID; @@ -48,19 +49,26 @@ import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitR import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess; import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.mdsal.common.api.CommitInfo; import org.opendaylight.yangtools.yang.common.Empty; @RunWith(MockitoJUnitRunner.StrictStubs.class) public class ClientTransactionCommitCohortTest { - private static final String PERSISTENCE_ID = "per-1"; private static final int TRANSACTIONS = 3; + private final List> transactions = new ArrayList<>(); + @Mock private AbstractClientHistory history; + @Mock + private DatastoreContext datastoreContext; + @Mock + private ActorUtils actorUtils; + private ActorSystem system; - private List> transactions; private ClientTransactionCommitCohort cohort; @Before @@ -69,7 +77,10 @@ public class ClientTransactionCommitCohortTest { final TestProbe clientContextProbe = new TestProbe(system, "clientContext"); final ClientActorContext context = AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID); - transactions = new ArrayList<>(); + doReturn(1000).when(datastoreContext).getShardBatchedModificationCount(); + doReturn(datastoreContext).when(actorUtils).getDatastoreContext(); + doReturn(actorUtils).when(history).actorUtils(); + for (int i = 0; i < TRANSACTIONS; i++) { transactions.add(createTransactionTester(new TestProbe(system, "backend" + i), context, history)); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohortTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohortTest.java index 3e42cd279b..c165af5750 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohortTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohortTest.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID; @@ -36,14 +37,20 @@ import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequ import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess; import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; @RunWith(MockitoJUnitRunner.StrictStubs.class) public class DirectTransactionCommitCohortTest { - private static final String PERSISTENCE_ID = "per-1"; @Mock private AbstractClientHistory history; + @Mock + private DatastoreContext datastoreContext; + @Mock + private ActorUtils actorUtils; + private ActorSystem system; private TransactionTester transaction; private DirectTransactionCommitCohort cohort; @@ -54,6 +61,10 @@ public class DirectTransactionCommitCohortTest { final TestProbe clientContextProbe = new TestProbe(system, "clientContext"); final ClientActorContext context = AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID); + doReturn(1000).when(datastoreContext).getShardBatchedModificationCount(); + doReturn(datastoreContext).when(actorUtils).getDatastoreContext(); + doReturn(actorUtils).when(history).actorUtils(); + transaction = createTransactionTester(new TestProbe(system, "backend"), context, history); final AbstractProxyTransaction proxy = transaction.getTransaction(); proxy.seal(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransactionTest.java index 97c4b3126f..c9324702e6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransactionTest.java @@ -44,10 +44,10 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot; public class RemoteProxyTransactionTest extends AbstractProxyTransactionTest { - @Override protected RemoteProxyTransaction createTransaction(final ProxyHistory parent, final TransactionIdentifier id, final DataTreeSnapshot snapshot) { + mockForRemote(); return new RemoteProxyTransaction(parent, TRANSACTION_ID, false, false, false); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index e5f3c95835..6868ddcf16 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -898,10 +898,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry"); // Verify backend statistics on start - IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", - stats -> assertEquals("getReadWriteTransactionCount", 0, stats.getReadWriteTransactionCount())); - IntegrationTestKit.verifyShardStats(followerDistributedDataStore, "cars", - stats -> assertEquals("getReadWriteTransactionCount", 0, stats.getReadWriteTransactionCount())); + verifyCarsReadWriteTransactions(leaderDistributedDataStore, 0); + verifyCarsReadWriteTransactions(followerDistributedDataStore, 0); // Do an initial write to get the primary shard info cached. @@ -943,10 +941,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready(); // At this point only leader should see the transactions - IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", - stats -> assertEquals("getReadWriteTransactionCount", 2, stats.getReadWriteTransactionCount())); - IntegrationTestKit.verifyShardStats(followerDistributedDataStore, "cars", - stats -> assertEquals("getReadWriteTransactionCount", 0, stats.getReadWriteTransactionCount())); + verifyCarsReadWriteTransactions(leaderDistributedDataStore, 2); + verifyCarsReadWriteTransactions(followerDistributedDataStore, 0); // Prepare another WO that writes to a single shard and thus will be directly committed on ready. This // tx writes 5 cars so 2 BatchedModifications messages will be sent initially and cached in the leader shard @@ -974,13 +970,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final YangInstanceIdentifier carPath = CarsModel.newCarPath("car" + carIndex); readWriteTx.write(carPath, cars.getLast()); - // There is a difference here between implementations: tell-based protocol will postpone write operations until - // either a read is made or the transaction is submitted. Here we flush out the last transaction, so we see - // three transactions, not just the ones we have started committing - assertTrue(readWriteTx.exists(carPath).get(2, TimeUnit.SECONDS)); + // There is a difference here between implementations: tell-based protocol enforces batching on per-transaction + // level whereas ask-based protocol has a global limit towards a shard -- and hence flushes out last two + // transactions eagerly. final int earlyTxCount = DistributedDataStore.class.isAssignableFrom(testParameter) ? 5 : 3; - IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", - stats -> assertEquals("getReadWriteTransactionCount", earlyTxCount, stats.getReadWriteTransactionCount())); + verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount); + verifyCarsReadWriteTransactions(followerDistributedDataStore, 0); // Disable elections on the leader so it switches to follower. @@ -1015,16 +1010,20 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // At this point everything is committed and the follower datastore should see 5 transactions, but leader should // only see the initial transactions - IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", - stats -> assertEquals("getReadWriteTransactionCount", earlyTxCount, stats.getReadWriteTransactionCount())); - IntegrationTestKit.verifyShardStats(followerDistributedDataStore, "cars", - stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount())); + verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount); + verifyCarsReadWriteTransactions(followerDistributedDataStore, 5); DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction(); verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()])); verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people); } + private static void verifyCarsReadWriteTransactions(final AbstractDataStore datastore, final int expected) + throws Exception { + IntegrationTestKit.verifyShardStats(datastore, "cars", + stats -> assertEquals("getReadWriteTransactionCount", expected, stats.getReadWriteTransactionCount())); + } + @Test public void testLeadershipTransferOnShutdown() throws Exception { leaderDatastoreContextBuilder.shardBatchedModificationCount(1); -- 2.36.6