Honor shard-batched-modification-count 85/105185/6
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 3 Apr 2023 18:14:47 +0000 (20:14 +0200)
committerRobert Varga <nite@hq.sk>
Tue, 4 Apr 2023 08:19:24 +0000 (08:19 +0000)
RemoteProxyTransaction should honor not use a hard-coded limit, but
rather pick it up from DatastoreContext configuration.

JIRA: CONTROLLER-2075
Change-Id: I5cf6491f76c05582b2c59e918d8af9521c01d98f
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistoryTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehaviorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohortTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohortTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java

index 0f75c747f02273bd11d278b0432199ae4448de9e..6288a2f8ba0e5c098096a1df05257af018a0af39 100644 (file)
@@ -70,12 +70,10 @@ import org.slf4j.LoggerFactory;
 final class RemoteProxyTransaction extends AbstractProxyTransaction {
     private static final Logger LOG = LoggerFactory.getLogger(RemoteProxyTransaction.class);
 
-    // FIXME: make this tuneable
-    private static final int REQUEST_MAX_MODIFICATIONS = 1000;
-
     private final ModifyTransactionRequestBuilder builder;
     private final boolean sendReadyOnSeal;
     private final boolean snapshotOnly;
+    private final int maxModifications;
 
     private boolean builderBusy;
 
@@ -87,6 +85,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         this.snapshotOnly = snapshotOnly;
         this.sendReadyOnSeal = sendReadyOnSeal;
         builder = new ModifyTransactionRequestBuilder(identifier, localActor());
+        maxModifications = parent.parent().actorUtils().getDatastoreContext().getShardBatchedModificationCount();
     }
 
     @Override
@@ -184,7 +183,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             ensureInitializedBuilder();
 
             builder.addModification(modification);
-            if (builder.size() >= REQUEST_MAX_MODIFICATIONS) {
+            if (builder.size() >= maxModifications) {
                 flushBuilder(enqueuedTicks);
             }
         } else {
index 05531cddd98b2a58741f627ee4c912ad9056017b..3d9b3fd3365eced36b2e4b926dc2aa670517806e 100644 (file)
@@ -28,6 +28,7 @@ import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -42,6 +43,8 @@ public abstract class AbstractClientHistoryTest<T extends AbstractClientHistory>
 
     @Mock
     private DataTree tree;
+    @Mock
+    private DatastoreContext datastoreContext;
 
     protected abstract T object();
 
@@ -176,13 +179,16 @@ public abstract class AbstractClientHistoryTest<T extends AbstractClientHistory>
         assertNull(reconnectCohort);
     }
 
-    protected static ActorUtils createActorUtilsMock(final ActorSystem system, final ActorRef actor) {
+    protected final ActorUtils createActorUtilsMock(final ActorSystem system, final ActorRef actor) {
         final ActorUtils mock = mock(ActorUtils.class);
         final Promise<PrimaryShardInfo> promise = new DefaultPromise<>();
         final ActorSelection selection = system.actorSelection(actor.path());
         final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
         promise.success(shardInfo);
         doReturn(promise.future()).when(mock).findPrimaryShardAsync(any());
+        doReturn(1000).when(datastoreContext).getShardBatchedModificationCount();
+        doReturn(datastoreContext).when(mock).getDatastoreContext();
+
         return mock;
     }
 }
\ No newline at end of file
index 14a29c5101394be7a971802769bfa7f1ecd40e25..ed7bdaca14137233b80401a4ae35dff1de8ca5ac 100644 (file)
@@ -32,6 +32,7 @@ import org.opendaylight.controller.cluster.access.client.ClientActorContext;
 import org.opendaylight.controller.cluster.access.client.InternalCommand;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -50,16 +51,17 @@ public abstract class AbstractDataStoreClientBehaviorTest {
     private TestProbe clientActorProbe;
     private TestProbe actorContextProbe;
     private AbstractDataStoreClientBehavior behavior;
+    private ActorUtils util;
 
     @Before
     public void setUp() {
         system = ActorSystem.apply();
         clientActorProbe = new TestProbe(system, "client");
         actorContextProbe = new TestProbe(system, "actor-context");
-        final ActorUtils context = createActorContextMock(system, actorContextProbe.ref());
+        util = createActorContextMock(system, actorContextProbe.ref());
         clientContext =
                 AccessClientUtil.createClientActorContext(system, clientActorProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
-        behavior = createBehavior(clientContext, context);
+        behavior = createBehavior(clientContext, util);
     }
 
     @SuppressWarnings("checkstyle:hiddenField")
@@ -132,6 +134,10 @@ public abstract class AbstractDataStoreClientBehaviorTest {
 
     @Test
     public void testGetConnection() {
+        final var datastoreContext = mock(DatastoreContext.class);
+        doReturn(1000).when(datastoreContext).getShardBatchedModificationCount();
+        doReturn(datastoreContext).when(util).getDatastoreContext();
+
         //set up data tree mock
         final CursorAwareDataTreeModification modification = mock(CursorAwareDataTreeModification.class);
         doReturn(Optional.empty()).when(modification).readNode(YangInstanceIdentifier.empty());
index 08a70112942a6b69a405f8aea9468251f6497e75..353123517f18cde470da5e221558564e5eff9daa 100644 (file)
@@ -14,6 +14,7 @@ import static org.hamcrest.core.Is.isA;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -60,6 +61,8 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.yangtools.yang.common.Empty;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -95,6 +98,11 @@ public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransa
     private DataTreeSnapshot snapshot;
     @Mock
     private AbstractClientHistory history;
+    @Mock
+    private DatastoreContext datastoreContext;
+    @Mock
+    private ActorUtils actorUtils;
+
     private ActorSystem system;
     private TestProbe backendProbe;
     private TestProbe clientContextProbe;
@@ -113,11 +121,18 @@ public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransa
                 "default", UnsignedLong.ZERO, Optional.empty(), 3);
         final AbstractClientConnection<ShardBackendInfo> connection =
                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
+
         final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
         transaction = createTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
         tester = new TransactionTester<>(transaction, connection, backendProbe);
     }
 
+    protected final void mockForRemote() {
+        doReturn(1000).when(datastoreContext).getShardBatchedModificationCount();
+        doReturn(datastoreContext).when(actorUtils).getDatastoreContext();
+        doReturn(actorUtils).when(history).actorUtils();
+    }
+
     @SuppressWarnings("checkstyle:hiddenField")
     protected abstract T createTransaction(ProxyHistory parent, TransactionIdentifier id, DataTreeSnapshot snapshot);
 
@@ -322,6 +337,10 @@ public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransa
         final TestProbe clientContextProbe = new TestProbe(system, "remoteClientContext");
         final TestProbe backendProbe = new TestProbe(system, "remoteBackend");
         final AbstractClientHistory history = mock(AbstractClientHistory.class);
+        doReturn(1000).when(datastoreContext).getShardBatchedModificationCount();
+        doReturn(datastoreContext).when(actorUtils).getDatastoreContext();
+        doReturn(actorUtils).when(history).actorUtils();
+
         final ClientActorContext context =
                 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.current(),
@@ -329,6 +348,7 @@ public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransa
         final AbstractClientConnection<ShardBackendInfo> connection =
                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
         final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
+
         final RemoteProxyTransaction transaction =
                 new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false, false);
         return new TransactionTester<>(transaction, connection, backendProbe);
index a123b9e50907372cd2017e8c85aca2cf634e543c..e54b275c9516fd858aaeb6a0461f0ac1ed2e1c70 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID;
@@ -48,19 +49,26 @@ import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitR
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.yangtools.yang.common.Empty;
 
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class ClientTransactionCommitCohortTest {
-
     private static final String PERSISTENCE_ID = "per-1";
     private static final int TRANSACTIONS = 3;
 
+    private final List<TransactionTester<RemoteProxyTransaction>> transactions = new ArrayList<>();
+
     @Mock
     private AbstractClientHistory history;
+    @Mock
+    private DatastoreContext datastoreContext;
+    @Mock
+    private ActorUtils actorUtils;
+
     private ActorSystem system;
-    private List<TransactionTester<RemoteProxyTransaction>> transactions;
     private ClientTransactionCommitCohort cohort;
 
     @Before
@@ -69,7 +77,10 @@ public class ClientTransactionCommitCohortTest {
         final TestProbe clientContextProbe = new TestProbe(system, "clientContext");
         final ClientActorContext context =
                 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
-        transactions = new ArrayList<>();
+        doReturn(1000).when(datastoreContext).getShardBatchedModificationCount();
+        doReturn(datastoreContext).when(actorUtils).getDatastoreContext();
+        doReturn(actorUtils).when(history).actorUtils();
+
         for (int i = 0; i < TRANSACTIONS; i++) {
             transactions.add(createTransactionTester(new TestProbe(system, "backend" + i), context, history));
         }
index 3e42cd279b21f1454c4fb42a5d1e92d843bcb0dd..c165af5750ea9587769e19a4e51ac80ae2822923 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID;
@@ -36,14 +37,20 @@ import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequ
 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
 import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class DirectTransactionCommitCohortTest {
-
     private static final String PERSISTENCE_ID = "per-1";
 
     @Mock
     private AbstractClientHistory history;
+    @Mock
+    private DatastoreContext datastoreContext;
+    @Mock
+    private ActorUtils actorUtils;
+
     private ActorSystem system;
     private TransactionTester<?> transaction;
     private DirectTransactionCommitCohort cohort;
@@ -54,6 +61,10 @@ public class DirectTransactionCommitCohortTest {
         final TestProbe clientContextProbe = new TestProbe(system, "clientContext");
         final ClientActorContext context =
                 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
+        doReturn(1000).when(datastoreContext).getShardBatchedModificationCount();
+        doReturn(datastoreContext).when(actorUtils).getDatastoreContext();
+        doReturn(actorUtils).when(history).actorUtils();
+
         transaction = createTransactionTester(new TestProbe(system, "backend"), context, history);
         final AbstractProxyTransaction proxy = transaction.getTransaction();
         proxy.seal();
index 97c4b3126f6bb46e3e3199eff6e50a7477f4e531..c9324702e670b59545ef6e2ccadf9e87331133e2 100644 (file)
@@ -44,10 +44,10 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
 
 public class RemoteProxyTransactionTest extends AbstractProxyTransactionTest<RemoteProxyTransaction> {
-
     @Override
     protected RemoteProxyTransaction createTransaction(final ProxyHistory parent, final TransactionIdentifier id,
                                                        final DataTreeSnapshot snapshot) {
+        mockForRemote();
         return new RemoteProxyTransaction(parent, TRANSACTION_ID, false, false, false);
     }
 
index e5f3c958354114fa0e2dade15b728b60882be941..6868ddcf1618b0bedaabfdb8edd0061adafd6264 100644 (file)
@@ -898,10 +898,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
 
         // Verify backend statistics on start
-        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", 0, stats.getReadWriteTransactionCount()));
-        IntegrationTestKit.verifyShardStats(followerDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", 0, stats.getReadWriteTransactionCount()));
+        verifyCarsReadWriteTransactions(leaderDistributedDataStore, 0);
+        verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
 
         // Do an initial write to get the primary shard info cached.
 
@@ -943,10 +941,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready();
 
         // At this point only leader should see the transactions
-        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", 2, stats.getReadWriteTransactionCount()));
-        IntegrationTestKit.verifyShardStats(followerDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", 0, stats.getReadWriteTransactionCount()));
+        verifyCarsReadWriteTransactions(leaderDistributedDataStore, 2);
+        verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
 
         // Prepare another WO that writes to a single shard and thus will be directly committed on ready. This
         // tx writes 5 cars so 2 BatchedModifications messages will be sent initially and cached in the leader shard
@@ -974,13 +970,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         final YangInstanceIdentifier carPath = CarsModel.newCarPath("car" + carIndex);
         readWriteTx.write(carPath, cars.getLast());
 
-        // There is a difference here between implementations: tell-based protocol will postpone write operations until
-        // either a read is made or the transaction is submitted. Here we flush out the last transaction, so we see
-        // three transactions, not just the ones we have started committing
-        assertTrue(readWriteTx.exists(carPath).get(2, TimeUnit.SECONDS));
+        // There is a difference here between implementations: tell-based protocol enforces batching on per-transaction
+        // level whereas ask-based protocol has a global limit towards a shard -- and hence flushes out last two
+        // transactions eagerly.
         final int earlyTxCount = DistributedDataStore.class.isAssignableFrom(testParameter) ? 5 : 3;
-        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", earlyTxCount, stats.getReadWriteTransactionCount()));
+        verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount);
+        verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
 
         // Disable elections on the leader so it switches to follower.
 
@@ -1015,16 +1010,20 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         // At this point everything is committed and the follower datastore should see 5 transactions, but leader should
         // only see the initial transactions
-        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", earlyTxCount, stats.getReadWriteTransactionCount()));
-        IntegrationTestKit.verifyShardStats(followerDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount()));
+        verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount);
+        verifyCarsReadWriteTransactions(followerDistributedDataStore, 5);
 
         DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction();
         verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()]));
         verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people);
     }
 
+    private static void verifyCarsReadWriteTransactions(final AbstractDataStore datastore, final int expected)
+            throws Exception {
+        IntegrationTestKit.verifyShardStats(datastore, "cars",
+            stats -> assertEquals("getReadWriteTransactionCount", expected, stats.getReadWriteTransactionCount()));
+    }
+
     @Test
     public void testLeadershipTransferOnShutdown() throws Exception {
         leaderDatastoreContextBuilder.shardBatchedModificationCount(1);