Abort pending txn's in Shard on leader transition and shutdown
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index ef4bab44f8ebb51c65312d60822dd36fdc6d9c49..f5ca3a86ec53e7ecacbfb3a617fa977c909c5cc5 100644 (file)
@@ -189,6 +189,8 @@ public class Shard extends RaftActor {
             txCommitTimeoutCheckSchedule.cancel();
         }
 
+        commitCoordinator.abortPendingTransactions("Transaction aborted due to shutdown.", this);
+
         shardMBean.unregisterMBean();
     }
 
@@ -252,7 +254,7 @@ public class Shard extends RaftActor {
                 setPeerAddress(resolved.getPeerId().toString(),
                         resolved.getPeerAddress());
             } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
-                handleTransactionCommitTimeoutCheck();
+                commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
             } else if(message instanceof DatastoreContext) {
                 onDatastoreContext((DatastoreContext)message);
             } else if(message instanceof RegisterRoleChangeListener){
@@ -316,20 +318,6 @@ public class Shard extends RaftActor {
         updateConfigParams(datastoreContext.getShardRaftConfig());
     }
 
-    private void handleTransactionCommitTimeoutCheck() {
-        CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
-        if(cohortEntry != null) {
-            if(cohortEntry.isExpired(transactionCommitTimeout)) {
-                LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
-                        persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
-
-                doAbortTransaction(cohortEntry.getTransactionID(), null);
-            }
-        }
-
-        commitCoordinator.cleanupExpiredCohortEntries();
-    }
-
     private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
         return ModificationType.UNMODIFIED.equals(candidate.getRootNode().getModificationType());
     }
@@ -710,6 +698,9 @@ public class Shard extends RaftActor {
             }
 
             store.closeAllTransactionChains();
+
+            commitCoordinator.abortPendingTransactions(
+                    "The transacton was aborted due to inflight leadership change.", this);
         }
 
         if(hasLeader && !isIsolatedLeader()) {