From: Robert Varga Date: Thu, 16 May 2019 13:54:29 +0000 (+0200) Subject: Disable transaction tracking for ask-based protocol X-Git-Tag: release/sodium~81 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=c413251d63152cc70fce767de0a3776f16b9a27d Disable transaction tracking for ask-based protocol When we encounter an ask-based protocol message on a leader, we persist a payload to make sure no state tracking occurs for that client. JIRA: CONTROLLER-1879 Change-Id: I3d12a06ce9e5b65ada5b59bde9f9b5486e5e4ef7 Signed-off-by: Robert Varga Signed-off-by: Tomas Cere --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index fb6b0142fe..718b0641da 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -649,13 +649,14 @@ public class Shard extends RaftActor { } private void handleCommitTransaction(final CommitTransaction commit) { + final TransactionIdentifier txId = commit.getTransactionId(); if (isLeader()) { - commitCoordinator.handleCommit(commit.getTransactionId(), getSender(), this); + askProtocolEncountered(txId); + commitCoordinator.handleCommit(txId, getSender(), this); } else { ActorSelection leader = getLeader(); if (leader == null) { - messageRetrySupport.addMessageToRetry(commit, getSender(), - "Could not commit transaction " + commit.getTransactionId()); + messageRetrySupport.addMessageToRetry(commit, getSender(), "Could not commit transaction " + txId); } else { LOG.debug("{}: Forwarding CommitTransaction to leader {}", persistenceId(), leader); leader.forward(commit, getContext()); @@ -664,15 +665,17 @@ public class Shard extends RaftActor { } private void handleCanCommitTransaction(final CanCommitTransaction canCommit) { - LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionId()); + final TransactionIdentifier txId = canCommit.getTransactionId(); + LOG.debug("{}: Can committing transaction {}", persistenceId(), txId); if (isLeader()) { - commitCoordinator.handleCanCommit(canCommit.getTransactionId(), getSender(), this); + askProtocolEncountered(txId); + commitCoordinator.handleCanCommit(txId, getSender(), this); } else { ActorSelection leader = getLeader(); if (leader == null) { messageRetrySupport.addMessageToRetry(canCommit, getSender(), - "Could not canCommit transaction " + canCommit.getTransactionId()); + "Could not canCommit transaction " + txId); } else { LOG.debug("{}: Forwarding CanCommitTransaction to leader {}", persistenceId(), leader); leader.forward(canCommit, getContext()); @@ -682,6 +685,8 @@ public class Shard extends RaftActor { @SuppressWarnings("checkstyle:IllegalCatch") protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) { + askProtocolEncountered(batched.getTransactionId()); + try { commitCoordinator.handleBatchedModifications(batched, sender, this); } catch (Exception e) { @@ -778,6 +783,7 @@ public class Shard extends RaftActor { boolean isLeaderActive = isLeaderActive(); if (isLeader() && isLeaderActive) { + askProtocolEncountered(forwardedReady.getTransactionId()); commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this); } else { ActorSelection leader = getLeader(); @@ -797,7 +803,9 @@ public class Shard extends RaftActor { } private void handleAbortTransaction(final AbortTransaction abort) { - doAbortTransaction(abort.getTransactionId(), getSender()); + final TransactionIdentifier transactionId = abort.getTransactionId(); + askProtocolEncountered(transactionId); + doAbortTransaction(transactionId, getSender()); } void doAbortTransaction(final Identifier transactionID, final ActorRef sender) { @@ -818,6 +826,8 @@ public class Shard extends RaftActor { private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) { if (isLeader()) { final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier(); + askProtocolEncountered(id.getClientId()); + // FIXME: CONTROLLER-1628: stage purge once no transactions are present store.closeTransactionChain(id, null); store.purgeTransactionChain(id, null); @@ -830,6 +840,8 @@ public class Shard extends RaftActor { @SuppressWarnings("checkstyle:IllegalCatch") private void createTransaction(final CreateTransaction createTransaction) { + askProtocolEncountered(createTransaction.getTransactionId()); + try { if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY && failIfIsolatedLeader(getSender())) { @@ -852,6 +864,21 @@ public class Shard extends RaftActor { transactionId); } + // Called on leader only + private void askProtocolEncountered(final TransactionIdentifier transactionId) { + askProtocolEncountered(transactionId.getHistoryId().getClientId()); + } + + // Called on leader only + private void askProtocolEncountered(final ClientIdentifier clientId) { + final LeaderFrontendState state = knownFrontends.get(clientId.getFrontendId()); + if (state instanceof LeaderFrontendState.Enabled) { + LOG.debug("{}: encountered ask-based client {}, disabling transaction tracking", persistenceId(), clientId); + persistPayload(clientId, DisableTrackingPayload.create(clientId, + datastoreContext.getInitialPayloadSerializedBufferCapacity()), false); + } + } + private void updateSchemaContext(final UpdateSchemaContext message) { updateSchemaContext(message.getSchemaContext()); }