Abort pending txn's in Shard on leader transition and shutdown 79/31979/5
authorTom Pantelis <tpanteli@brocade.com>
Wed, 30 Dec 2015 11:07:31 +0000 (06:07 -0500)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 8 Jan 2016 02:55:42 +0000 (02:55 +0000)
Added code in the ShardCommitCoordinator to abort pending txn's with an
appropriate failure message when leadership is lost and on shutdown.

Also moved the handleTransactionCommitTimeoutCheck logic from the Shard
to the ShardCommitCoordinator for consistency.

Change-Id: I4af1262aba76909536348a07a368f1559714f90d
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java

index ef4bab4..f5ca3a8 100644 (file)
@@ -189,6 +189,8 @@ public class Shard extends RaftActor {
             txCommitTimeoutCheckSchedule.cancel();
         }
 
+        commitCoordinator.abortPendingTransactions("Transaction aborted due to shutdown.", this);
+
         shardMBean.unregisterMBean();
     }
 
@@ -252,7 +254,7 @@ public class Shard extends RaftActor {
                 setPeerAddress(resolved.getPeerId().toString(),
                         resolved.getPeerAddress());
             } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
-                handleTransactionCommitTimeoutCheck();
+                commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
             } else if(message instanceof DatastoreContext) {
                 onDatastoreContext((DatastoreContext)message);
             } else if(message instanceof RegisterRoleChangeListener){
@@ -316,20 +318,6 @@ public class Shard extends RaftActor {
         updateConfigParams(datastoreContext.getShardRaftConfig());
     }
 
-    private void handleTransactionCommitTimeoutCheck() {
-        CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
-        if(cohortEntry != null) {
-            if(cohortEntry.isExpired(transactionCommitTimeout)) {
-                LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
-                        persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
-
-                doAbortTransaction(cohortEntry.getTransactionID(), null);
-            }
-        }
-
-        commitCoordinator.cleanupExpiredCohortEntries();
-    }
-
     private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
         return ModificationType.UNMODIFIED.equals(candidate.getRootNode().getModificationType());
     }
@@ -710,6 +698,9 @@ public class Shard extends RaftActor {
             }
 
             store.closeAllTransactionChains();
+
+            commitCoordinator.abortPendingTransactions(
+                    "The transacton was aborted due to inflight leadership change.", this);
         }
 
         if(hasLeader && !isIsolatedLeader()) {
index fd55cee..57c5b1d 100644 (file)
@@ -8,14 +8,16 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
-import akka.actor.Status;
+import akka.actor.Status.Failure;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
@@ -116,7 +118,7 @@ class ShardCommitCoordinator {
                                   " capacity %d has been reached.",
                                   name, cohortEntry.getTransactionID(), queueCapacity));
             log.error(ex.getMessage());
-            sender.tell(new Status.Failure(ex), shard.self());
+            sender.tell(new Failure(ex), shard.self());
             return false;
         }
     }
@@ -312,7 +314,7 @@ class ShardCommitCoordinator {
             IllegalStateException ex = new IllegalStateException(
                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
             log.error(ex.getMessage());
-            sender.tell(new Status.Failure(ex), shard.self());
+            sender.tell(new Failure(ex), shard.self());
             return;
         }
 
@@ -333,7 +335,7 @@ class ShardCommitCoordinator {
                 if(canCommit) {
                     doCommit(cohortEntry);
                 } else {
-                    cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException(
+                    cohortEntry.getReplySender().tell(new Failure(new TransactionCommitFailedException(
                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
                 }
             } else {
@@ -349,7 +351,7 @@ class ShardCommitCoordinator {
                 failure = e.getCause();
             }
 
-            cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self());
+            cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
         } finally {
             if(!canCommit) {
                 // Remove the entry from the cache now.
@@ -379,7 +381,7 @@ class ShardCommitCoordinator {
         } catch (Exception e) {
             log.error("{} An exception occurred while preCommitting transaction {}",
                     name, cohortEntry.getTransactionID(), e);
-            cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self());
+            cohortEntry.getReplySender().tell(new Failure(e), cohortEntry.getShard().self());
 
             currentTransactionComplete(cohortEntry.getTransactionID(), true);
         }
@@ -406,7 +408,7 @@ class ShardCommitCoordinator {
                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
                             name, transactionID));
             log.error(ex.getMessage());
-            sender.tell(new akka.actor.Status.Failure(ex), shard.self());
+            sender.tell(new Failure(ex), shard.self());
             return false;
         }
 
@@ -444,7 +446,43 @@ class ShardCommitCoordinator {
             log.error("{}: An exception happened during abort", name, e);
 
             if(sender != null) {
-                sender.tell(new akka.actor.Status.Failure(e), self);
+                sender.tell(new Failure(e), self);
+            }
+        }
+    }
+
+    void checkForExpiredTransactions(final long timeout, final Shard shard) {
+        CohortEntry cohortEntry = getCurrentCohortEntry();
+        if(cohortEntry != null) {
+            if(cohortEntry.isExpired(timeout)) {
+                log.warn("{}: Current transaction {} has timed out after {} ms - aborting",
+                        name, cohortEntry.getTransactionID(), timeout);
+
+                handleAbort(cohortEntry.getTransactionID(), null, shard);
+            }
+        }
+
+        cleanupExpiredCohortEntries();
+    }
+
+    void abortPendingTransactions(final String reason, final Shard shard) {
+        if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
+            return;
+        }
+
+        List<CohortEntry> cohortEntries = new ArrayList<>();
+
+        if(currentCohortEntry != null) {
+            cohortEntries.add(currentCohortEntry);
+            currentCohortEntry = null;
+        }
+
+        cohortEntries.addAll(queuedCohortEntries);
+        queuedCohortEntries.clear();
+
+        for(CohortEntry cohortEntry: cohortEntries) {
+            if(cohortEntry.getReplySender() != null) {
+                cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
             }
         }
     }
@@ -457,7 +495,7 @@ class ShardCommitCoordinator {
      * @return the current CohortEntry or null if the given transaction ID does not match the
      *         current entry.
      */
-    public CohortEntry getCohortEntryIfCurrent(String transactionID) {
+    CohortEntry getCohortEntryIfCurrent(String transactionID) {
         if(isCurrentTransaction(transactionID)) {
             return currentCohortEntry;
         }
@@ -465,15 +503,15 @@ class ShardCommitCoordinator {
         return null;
     }
 
-    public CohortEntry getCurrentCohortEntry() {
+    CohortEntry getCurrentCohortEntry() {
         return currentCohortEntry;
     }
 
-    public CohortEntry getAndRemoveCohortEntry(String transactionID) {
+    CohortEntry getAndRemoveCohortEntry(String transactionID) {
         return cohortCache.remove(transactionID);
     }
 
-    public boolean isCurrentTransaction(String transactionID) {
+    boolean isCurrentTransaction(String transactionID) {
         return currentCohortEntry != null &&
                 currentCohortEntry.getTransactionID().equals(transactionID);
     }
@@ -487,7 +525,7 @@ class ShardCommitCoordinator {
      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
      *        the cache.
      */
-    public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
+    void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
         if(removeCohortEntry) {
             cohortCache.remove(transactionID);
         }