final class RemoteProxyTransaction extends AbstractProxyTransaction {
private static final Logger LOG = LoggerFactory.getLogger(RemoteProxyTransaction.class);
- // FIXME: make this tuneable
- private static final int REQUEST_MAX_MODIFICATIONS = 1000;
-
private final ModifyTransactionRequestBuilder builder;
private final boolean sendReadyOnSeal;
private final boolean snapshotOnly;
+ private final int maxModifications;
private boolean builderBusy;
this.snapshotOnly = snapshotOnly;
this.sendReadyOnSeal = sendReadyOnSeal;
builder = new ModifyTransactionRequestBuilder(identifier, localActor());
+ maxModifications = parent.parent().actorUtils().getDatastoreContext().getShardBatchedModificationCount();
}
@Override
ensureInitializedBuilder();
builder.addModification(modification);
- if (builder.size() >= REQUEST_MAX_MODIFICATIONS) {
+ if (builder.size() >= maxModifications) {
flushBuilder(enqueuedTicks);
}
} else {
import org.opendaylight.controller.cluster.access.client.ClientActorContext;
import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@Mock
private DataTree tree;
+ @Mock
+ private DatastoreContext datastoreContext;
protected abstract T object();
assertNull(reconnectCohort);
}
- protected static ActorUtils createActorUtilsMock(final ActorSystem system, final ActorRef actor) {
+ protected final ActorUtils createActorUtilsMock(final ActorSystem system, final ActorRef actor) {
final ActorUtils mock = mock(ActorUtils.class);
final Promise<PrimaryShardInfo> promise = new DefaultPromise<>();
final ActorSelection selection = system.actorSelection(actor.path());
final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
promise.success(shardInfo);
doReturn(promise.future()).when(mock).findPrimaryShardAsync(any());
+ doReturn(1000).when(datastoreContext).getShardBatchedModificationCount();
+ doReturn(datastoreContext).when(mock).getDatastoreContext();
+
return mock;
}
}
\ No newline at end of file
import org.opendaylight.controller.cluster.access.client.InternalCommand;
import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private TestProbe clientActorProbe;
private TestProbe actorContextProbe;
private AbstractDataStoreClientBehavior behavior;
+ private ActorUtils util;
@Before
public void setUp() {
system = ActorSystem.apply();
clientActorProbe = new TestProbe(system, "client");
actorContextProbe = new TestProbe(system, "actor-context");
- final ActorUtils context = createActorContextMock(system, actorContextProbe.ref());
+ util = createActorContextMock(system, actorContextProbe.ref());
clientContext =
AccessClientUtil.createClientActorContext(system, clientActorProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
- behavior = createBehavior(clientContext, context);
+ behavior = createBehavior(clientContext, util);
}
@SuppressWarnings("checkstyle:hiddenField")
@Test
public void testGetConnection() {
+ final var datastoreContext = mock(DatastoreContext.class);
+ doReturn(1000).when(datastoreContext).getShardBatchedModificationCount();
+ doReturn(datastoreContext).when(util).getDatastoreContext();
+
//set up data tree mock
final CursorAwareDataTreeModification modification = mock(CursorAwareDataTreeModification.class);
doReturn(Optional.empty()).when(modification).readNode(YangInstanceIdentifier.empty());
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private DataTreeSnapshot snapshot;
@Mock
private AbstractClientHistory history;
+ @Mock
+ private DatastoreContext datastoreContext;
+ @Mock
+ private ActorUtils actorUtils;
+
private ActorSystem system;
private TestProbe backendProbe;
private TestProbe clientContextProbe;
"default", UnsignedLong.ZERO, Optional.empty(), 3);
final AbstractClientConnection<ShardBackendInfo> connection =
AccessClientUtil.createConnectedConnection(context, 0L, backend);
+
final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
transaction = createTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
tester = new TransactionTester<>(transaction, connection, backendProbe);
}
+ protected final void mockForRemote() {
+ doReturn(1000).when(datastoreContext).getShardBatchedModificationCount();
+ doReturn(datastoreContext).when(actorUtils).getDatastoreContext();
+ doReturn(actorUtils).when(history).actorUtils();
+ }
+
@SuppressWarnings("checkstyle:hiddenField")
protected abstract T createTransaction(ProxyHistory parent, TransactionIdentifier id, DataTreeSnapshot snapshot);
final TestProbe clientContextProbe = new TestProbe(system, "remoteClientContext");
final TestProbe backendProbe = new TestProbe(system, "remoteBackend");
final AbstractClientHistory history = mock(AbstractClientHistory.class);
+ doReturn(1000).when(datastoreContext).getShardBatchedModificationCount();
+ doReturn(datastoreContext).when(actorUtils).getDatastoreContext();
+ doReturn(actorUtils).when(history).actorUtils();
+
final ClientActorContext context =
AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.current(),
final AbstractClientConnection<ShardBackendInfo> connection =
AccessClientUtil.createConnectedConnection(context, 0L, backend);
final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
+
final RemoteProxyTransaction transaction =
new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false, false);
return new TransactionTester<>(transaction, connection, backendProbe);
package org.opendaylight.controller.cluster.databroker.actors.dds;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.yangtools.yang.common.Empty;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class ClientTransactionCommitCohortTest {
-
private static final String PERSISTENCE_ID = "per-1";
private static final int TRANSACTIONS = 3;
+ private final List<TransactionTester<RemoteProxyTransaction>> transactions = new ArrayList<>();
+
@Mock
private AbstractClientHistory history;
+ @Mock
+ private DatastoreContext datastoreContext;
+ @Mock
+ private ActorUtils actorUtils;
+
private ActorSystem system;
- private List<TransactionTester<RemoteProxyTransaction>> transactions;
private ClientTransactionCommitCohort cohort;
@Before
final TestProbe clientContextProbe = new TestProbe(system, "clientContext");
final ClientActorContext context =
AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
- transactions = new ArrayList<>();
+ doReturn(1000).when(datastoreContext).getShardBatchedModificationCount();
+ doReturn(datastoreContext).when(actorUtils).getDatastoreContext();
+ doReturn(actorUtils).when(history).actorUtils();
+
for (int i = 0; i < TRANSACTIONS; i++) {
transactions.add(createTransactionTester(new TestProbe(system, "backend" + i), context, history));
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID;
import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class DirectTransactionCommitCohortTest {
-
private static final String PERSISTENCE_ID = "per-1";
@Mock
private AbstractClientHistory history;
+ @Mock
+ private DatastoreContext datastoreContext;
+ @Mock
+ private ActorUtils actorUtils;
+
private ActorSystem system;
private TransactionTester<?> transaction;
private DirectTransactionCommitCohort cohort;
final TestProbe clientContextProbe = new TestProbe(system, "clientContext");
final ClientActorContext context =
AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
+ doReturn(1000).when(datastoreContext).getShardBatchedModificationCount();
+ doReturn(datastoreContext).when(actorUtils).getDatastoreContext();
+ doReturn(actorUtils).when(history).actorUtils();
+
transaction = createTransactionTester(new TestProbe(system, "backend"), context, history);
final AbstractProxyTransaction proxy = transaction.getTransaction();
proxy.seal();
import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
public class RemoteProxyTransactionTest extends AbstractProxyTransactionTest<RemoteProxyTransaction> {
-
@Override
protected RemoteProxyTransaction createTransaction(final ProxyHistory parent, final TransactionIdentifier id,
final DataTreeSnapshot snapshot) {
+ mockForRemote();
return new RemoteProxyTransaction(parent, TRANSACTION_ID, false, false, false);
}
initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
// Verify backend statistics on start
- IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
- stats -> assertEquals("getReadWriteTransactionCount", 0, stats.getReadWriteTransactionCount()));
- IntegrationTestKit.verifyShardStats(followerDistributedDataStore, "cars",
- stats -> assertEquals("getReadWriteTransactionCount", 0, stats.getReadWriteTransactionCount()));
+ verifyCarsReadWriteTransactions(leaderDistributedDataStore, 0);
+ verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
// Do an initial write to get the primary shard info cached.
final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready();
// At this point only leader should see the transactions
- IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
- stats -> assertEquals("getReadWriteTransactionCount", 2, stats.getReadWriteTransactionCount()));
- IntegrationTestKit.verifyShardStats(followerDistributedDataStore, "cars",
- stats -> assertEquals("getReadWriteTransactionCount", 0, stats.getReadWriteTransactionCount()));
+ verifyCarsReadWriteTransactions(leaderDistributedDataStore, 2);
+ verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
// Prepare another WO that writes to a single shard and thus will be directly committed on ready. This
// tx writes 5 cars so 2 BatchedModifications messages will be sent initially and cached in the leader shard
final YangInstanceIdentifier carPath = CarsModel.newCarPath("car" + carIndex);
readWriteTx.write(carPath, cars.getLast());
- // There is a difference here between implementations: tell-based protocol will postpone write operations until
- // either a read is made or the transaction is submitted. Here we flush out the last transaction, so we see
- // three transactions, not just the ones we have started committing
- assertTrue(readWriteTx.exists(carPath).get(2, TimeUnit.SECONDS));
+ // There is a difference here between implementations: tell-based protocol enforces batching on per-transaction
+ // level whereas ask-based protocol has a global limit towards a shard -- and hence flushes out last two
+ // transactions eagerly.
final int earlyTxCount = DistributedDataStore.class.isAssignableFrom(testParameter) ? 5 : 3;
- IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
- stats -> assertEquals("getReadWriteTransactionCount", earlyTxCount, stats.getReadWriteTransactionCount()));
+ verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount);
+ verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
// Disable elections on the leader so it switches to follower.
// At this point everything is committed and the follower datastore should see 5 transactions, but leader should
// only see the initial transactions
- IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
- stats -> assertEquals("getReadWriteTransactionCount", earlyTxCount, stats.getReadWriteTransactionCount()));
- IntegrationTestKit.verifyShardStats(followerDistributedDataStore, "cars",
- stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount()));
+ verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount);
+ verifyCarsReadWriteTransactions(followerDistributedDataStore, 5);
DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction();
verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()]));
verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people);
}
+ private static void verifyCarsReadWriteTransactions(final AbstractDataStore datastore, final int expected)
+ throws Exception {
+ IntegrationTestKit.verifyShardStats(datastore, "cars",
+ stats -> assertEquals("getReadWriteTransactionCount", expected, stats.getReadWriteTransactionCount()));
+ }
+
@Test
public void testLeadershipTransferOnShutdown() throws Exception {
leaderDatastoreContextBuilder.shardBatchedModificationCount(1);