Implement suspend leader in Shard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 7867c91158b7d939ca9b795c84eb38b383dadb96..ef4bab44f8ebb51c65312d60822dd36fdc6d9c49 100644 (file)
@@ -284,6 +284,10 @@ public class Shard extends RaftActor {
         return commitCoordinator.getQueueSize();
     }
 
+    public int getCohortCacheSize() {
+        return commitCoordinator.getCohortCacheSize();
+    }
+
     @Override
     protected Optional<ActorRef> getRoleChangeNotifier() {
         return roleChangeNotifier;
@@ -436,12 +440,12 @@ public class Shard extends RaftActor {
         // the primary/leader shard. However with timing and caching on the front-end, there's a small
         // window where it could have a stale leader during leadership transitions.
         //
-        boolean isIsolatedLeader = isIsolatedLeader();
-        if (isLeader() && !isIsolatedLeader) {
+        boolean isLeaderActive = isLeaderActive();
+        if (isLeader() && isLeaderActive) {
             handleBatchedModificationsLocal(batched, getSender());
         } else {
             ActorSelection leader = getLeader();
-            if (isIsolatedLeader || leader == null) {
+            if (!isLeaderActive || leader == null) {
                 messageRetrySupport.addMessageToRetry(batched, getSender(),
                         "Could not commit transaction " + batched.getTransactionID());
             } else {
@@ -473,8 +477,8 @@ public class Shard extends RaftActor {
     private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
         LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionID());
 
-        boolean isIsolatedLeader = isIsolatedLeader();
-        if (isLeader() && !isIsolatedLeader) {
+        boolean isLeaderActive = isLeaderActive();
+        if (isLeader() && isLeaderActive) {
             try {
                 commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
             } catch (Exception e) {
@@ -484,7 +488,7 @@ public class Shard extends RaftActor {
             }
         } else {
             ActorSelection leader = getLeader();
-            if (isIsolatedLeader || leader == null) {
+            if (!isLeaderActive || leader == null) {
                 messageRetrySupport.addMessageToRetry(message, getSender(),
                         "Could not commit transaction " + message.getTransactionID());
             } else {
@@ -498,12 +502,12 @@ public class Shard extends RaftActor {
     private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) {
         LOG.debug("{}: handleForwardedReadyTransaction for {}", persistenceId(), forwardedReady.getTransactionID());
 
-        boolean isIsolatedLeader = isIsolatedLeader();
-        if (isLeader() && !isIsolatedLeader) {
+        boolean isLeaderActive = isLeaderActive();
+        if (isLeader() && isLeaderActive) {
             commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
         } else {
             ActorSelection leader = getLeader();
-            if (isIsolatedLeader || leader == null) {
+            if (!isLeaderActive || leader == null) {
                 messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
                         "Could not commit transaction " + forwardedReady.getTransactionID());
             } else {
@@ -722,6 +726,12 @@ public class Shard extends RaftActor {
         }
     }
 
+    @Override
+    protected void pauseLeader(Runnable operation) {
+        LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
+        commitCoordinator.setRunOnPendingTransactionsComplete(operation);
+    }
+
     @Override
     public String persistenceId() {
         return this.name;