Disable transaction tracking for ask-based protocol 40/82140/8
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 16 May 2019 13:54:29 +0000 (15:54 +0200)
committerTom Pantelis <tompantelis@gmail.com>
Mon, 20 May 2019 14:25:16 +0000 (14:25 +0000)
When we encounter an ask-based protocol message on a leader,
we persist a payload to make sure no state tracking occurs
for that client.

JIRA: CONTROLLER-1879
Change-Id: I3d12a06ce9e5b65ada5b59bde9f9b5486e5e4ef7
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java

index fb6b0142fe440905bbdd55c81c3ff23772ee8c9b..718b0641dacaedd33af3852497a074ec236a84fa 100644 (file)
@@ -649,13 +649,14 @@ public class Shard extends RaftActor {
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {
+        final TransactionIdentifier txId = commit.getTransactionId();
         if (isLeader()) {
-            commitCoordinator.handleCommit(commit.getTransactionId(), getSender(), this);
+            askProtocolEncountered(txId);
+            commitCoordinator.handleCommit(txId, getSender(), this);
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
-                messageRetrySupport.addMessageToRetry(commit, getSender(),
-                        "Could not commit transaction " + commit.getTransactionId());
+                messageRetrySupport.addMessageToRetry(commit, getSender(), "Could not commit transaction " + txId);
             } else {
                 LOG.debug("{}: Forwarding CommitTransaction to leader {}", persistenceId(), leader);
                 leader.forward(commit, getContext());
@@ -664,15 +665,17 @@ public class Shard extends RaftActor {
     }
 
     private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
-        LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionId());
+        final TransactionIdentifier txId = canCommit.getTransactionId();
+        LOG.debug("{}: Can committing transaction {}", persistenceId(), txId);
 
         if (isLeader()) {
-            commitCoordinator.handleCanCommit(canCommit.getTransactionId(), getSender(), this);
+            askProtocolEncountered(txId);
+            commitCoordinator.handleCanCommit(txId, getSender(), this);
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
                 messageRetrySupport.addMessageToRetry(canCommit, getSender(),
-                        "Could not canCommit transaction " + canCommit.getTransactionId());
+                        "Could not canCommit transaction " + txId);
             } else {
                 LOG.debug("{}: Forwarding CanCommitTransaction to leader {}", persistenceId(), leader);
                 leader.forward(canCommit, getContext());
@@ -682,6 +685,8 @@ public class Shard extends RaftActor {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
+        askProtocolEncountered(batched.getTransactionId());
+
         try {
             commitCoordinator.handleBatchedModifications(batched, sender, this);
         } catch (Exception e) {
@@ -778,6 +783,7 @@ public class Shard extends RaftActor {
 
         boolean isLeaderActive = isLeaderActive();
         if (isLeader() && isLeaderActive) {
+            askProtocolEncountered(forwardedReady.getTransactionId());
             commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
         } else {
             ActorSelection leader = getLeader();
@@ -797,7 +803,9 @@ public class Shard extends RaftActor {
     }
 
     private void handleAbortTransaction(final AbortTransaction abort) {
-        doAbortTransaction(abort.getTransactionId(), getSender());
+        final TransactionIdentifier transactionId = abort.getTransactionId();
+        askProtocolEncountered(transactionId);
+        doAbortTransaction(transactionId, getSender());
     }
 
     void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
@@ -818,6 +826,8 @@ public class Shard extends RaftActor {
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
         if (isLeader()) {
             final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
+            askProtocolEncountered(id.getClientId());
+
             // FIXME: CONTROLLER-1628: stage purge once no transactions are present
             store.closeTransactionChain(id, null);
             store.purgeTransactionChain(id, null);
@@ -830,6 +840,8 @@ public class Shard extends RaftActor {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void createTransaction(final CreateTransaction createTransaction) {
+        askProtocolEncountered(createTransaction.getTransactionId());
+
         try {
             if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY
                     && failIfIsolatedLeader(getSender())) {
@@ -852,6 +864,21 @@ public class Shard extends RaftActor {
             transactionId);
     }
 
+    // Called on leader only
+    private void askProtocolEncountered(final TransactionIdentifier transactionId) {
+        askProtocolEncountered(transactionId.getHistoryId().getClientId());
+    }
+
+    // Called on leader only
+    private void askProtocolEncountered(final ClientIdentifier clientId) {
+        final LeaderFrontendState state = knownFrontends.get(clientId.getFrontendId());
+        if (state instanceof LeaderFrontendState.Enabled) {
+            LOG.debug("{}: encountered ask-based client {}, disabling transaction tracking", persistenceId(), clientId);
+            persistPayload(clientId, DisableTrackingPayload.create(clientId,
+                datastoreContext.getInitialPayloadSerializedBufferCapacity()), false);
+        }
+    }
+
     private void updateSchemaContext(final UpdateSchemaContext message) {
         updateSchemaContext(message.getSchemaContext());
     }