Disable transaction tracking for ask-based protocol 16/82216/1
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 16 May 2019 13:54:29 +0000 (15:54 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 23 May 2019 10:55:55 +0000 (12:55 +0200)
When we encounter an ask-based protocol message on a leader,
we persist a payload to make sure no state tracking occurs
for that client.

JIRA: CONTROLLER-1879
Change-Id: I3d12a06ce9e5b65ada5b59bde9f9b5486e5e4ef7
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
(cherry picked from commit c413251d63152cc70fce767de0a3776f16b9a27d)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java

index ad0fa5eaff235103b200754f769779dd2c0f12dc..55f932b2f01bca0a582ace6d40ae6f6055be7bc7 100644 (file)
@@ -652,13 +652,14 @@ public class Shard extends RaftActor {
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {
+        final TransactionIdentifier txId = commit.getTransactionId();
         if (isLeader()) {
-            commitCoordinator.handleCommit(commit.getTransactionId(), getSender(), this);
+            askProtocolEncountered(txId);
+            commitCoordinator.handleCommit(txId, getSender(), this);
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
-                messageRetrySupport.addMessageToRetry(commit, getSender(),
-                        "Could not commit transaction " + commit.getTransactionId());
+                messageRetrySupport.addMessageToRetry(commit, getSender(), "Could not commit transaction " + txId);
             } else {
                 LOG.debug("{}: Forwarding CommitTransaction to leader {}", persistenceId(), leader);
                 leader.forward(commit, getContext());
@@ -667,15 +668,17 @@ public class Shard extends RaftActor {
     }
 
     private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
-        LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionId());
+        final TransactionIdentifier txId = canCommit.getTransactionId();
+        LOG.debug("{}: Can committing transaction {}", persistenceId(), txId);
 
         if (isLeader()) {
-            commitCoordinator.handleCanCommit(canCommit.getTransactionId(), getSender(), this);
+            askProtocolEncountered(txId);
+            commitCoordinator.handleCanCommit(txId, getSender(), this);
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
                 messageRetrySupport.addMessageToRetry(canCommit, getSender(),
-                        "Could not canCommit transaction " + canCommit.getTransactionId());
+                        "Could not canCommit transaction " + txId);
             } else {
                 LOG.debug("{}: Forwarding CanCommitTransaction to leader {}", persistenceId(), leader);
                 leader.forward(canCommit, getContext());
@@ -685,6 +688,8 @@ public class Shard extends RaftActor {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
+        askProtocolEncountered(batched.getTransactionId());
+
         try {
             commitCoordinator.handleBatchedModifications(batched, sender, this);
         } catch (Exception e) {
@@ -781,6 +786,7 @@ public class Shard extends RaftActor {
 
         boolean isLeaderActive = isLeaderActive();
         if (isLeader() && isLeaderActive) {
+            askProtocolEncountered(forwardedReady.getTransactionId());
             commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
         } else {
             ActorSelection leader = getLeader();
@@ -800,7 +806,9 @@ public class Shard extends RaftActor {
     }
 
     private void handleAbortTransaction(final AbortTransaction abort) {
-        doAbortTransaction(abort.getTransactionId(), getSender());
+        final TransactionIdentifier transactionId = abort.getTransactionId();
+        askProtocolEncountered(transactionId);
+        doAbortTransaction(transactionId, getSender());
     }
 
     void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
@@ -821,6 +829,8 @@ public class Shard extends RaftActor {
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
         if (isLeader()) {
             final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
+            askProtocolEncountered(id.getClientId());
+
             // FIXME: CONTROLLER-1628: stage purge once no transactions are present
             store.closeTransactionChain(id, null);
             store.purgeTransactionChain(id, null);
@@ -833,6 +843,8 @@ public class Shard extends RaftActor {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void createTransaction(final CreateTransaction createTransaction) {
+        askProtocolEncountered(createTransaction.getTransactionId());
+
         try {
             if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY
                     && failIfIsolatedLeader(getSender())) {
@@ -855,6 +867,21 @@ public class Shard extends RaftActor {
             transactionId);
     }
 
+    // Called on leader only
+    private void askProtocolEncountered(final TransactionIdentifier transactionId) {
+        askProtocolEncountered(transactionId.getHistoryId().getClientId());
+    }
+
+    // Called on leader only
+    private void askProtocolEncountered(final ClientIdentifier clientId) {
+        final LeaderFrontendState state = knownFrontends.get(clientId.getFrontendId());
+        if (state instanceof LeaderFrontendState.Enabled) {
+            LOG.debug("{}: encountered ask-based client {}, disabling transaction tracking", persistenceId(), clientId);
+            persistPayload(clientId, DisableTrackingPayload.create(clientId,
+                datastoreContext.getInitialPayloadSerializedBufferCapacity()), false);
+        }
+    }
+
     private void updateSchemaContext(final UpdateSchemaContext message) {
         updateSchemaContext(message.getSchemaContext());
     }