Enlarge critical section to cover processNextTransaction()
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 6851f6aab9ca4441ab79055c967d14c4c5335cc5..a36807a8c8a4bb46cc0e7743f8daa1079615e605 100644 (file)
@@ -40,6 +40,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitionin
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
@@ -170,7 +171,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     @Override
-    public void handleRecover(Object message) {
+    protected void handleRecover(Object message) {
         if(raftRecovery == null) {
             raftRecovery = newRaftActorRecoverySupport();
         }
@@ -201,16 +202,29 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
+        if(getCurrentBehavior() != null) {
+            try {
+                getCurrentBehavior().close();
+            } catch(Exception e) {
+                LOG.warn("{}: Error closing behavior {}", persistence(), getCurrentBehavior(), e);
+            }
+        }
+
         reusableBehaviorStateHolder.init(getCurrentBehavior());
         setCurrentBehavior(newBehavior);
         handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
     }
 
     @Override
-    public void handleCommand(final Object message) {
-        if(serverConfigurationSupport.handleMessage(message, getSender())) {
+    protected void handleCommand(final Object message) {
+        if (serverConfigurationSupport.handleMessage(message, getSender())) {
             return;
-        } else if (message instanceof ApplyState){
+        }
+        if (snapshotSupport.handleSnapshotMessage(message, getSender())) {
+            return;
+        }
+
+        if (message instanceof ApplyState) {
             ApplyState applyState = (ApplyState) message;
 
             long elapsedTime = (System.nanoTime() - applyState.getStartTime());
@@ -238,10 +252,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
             }
 
-        } else if (message instanceof ApplyJournalEntries){
+        } else if (message instanceof ApplyJournalEntries) {
             ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
             if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex());
+                LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
             }
 
             persistence().persist(applyEntries, NoopProcedure.instance());
@@ -255,7 +269,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             onGetOnDemandRaftStats();
         } else if(message instanceof InitiateCaptureSnapshot) {
             captureSnapshot();
-        } else if(message instanceof SwitchBehavior){
+        } else if(message instanceof SwitchBehavior) {
             switchBehavior(((SwitchBehavior) message));
         } else if(message instanceof LeaderTransitioning) {
             onLeaderTransitioning();
@@ -263,12 +277,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             onShutDown();
         } else if(message instanceof Runnable) {
             ((Runnable)message).run();
-        } else if(!snapshotSupport.handleSnapshotMessage(message, getSender())) {
+        } else {
             switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
         }
     }
 
-    private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
+    void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
         LOG.debug("{}: Initiating leader transfer", persistenceId());
 
         if(leadershipTransferInProgress == null) {
@@ -316,9 +330,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 }
             });
         } else if(currentBehavior.state() == RaftState.Leader) {
-            pauseLeader(new Runnable() {
+            pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
+                @Override
+                protected void doRun() {
+                    self().tell(PoisonPill.getInstance(), self());
+                }
+
                 @Override
-                public void run() {
+                protected void doCancel() {
                     self().tell(PoisonPill.getInstance(), self());
                 }
             });
@@ -434,6 +453,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             if(leadershipTransferInProgress != null) {
                 leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
             }
+
+            serverConfigurationSupport.onNewLeader(currentBehavior.getLeaderId());
         }
 
         if (roleChangeNotifier.isPresent() &&
@@ -532,7 +553,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected boolean isLeaderActive() {
-        return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown && leadershipTransferInProgress == null;
+        return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown &&
+                !isLeadershipTransferInProgress();
+    }
+
+    private boolean isLeadershipTransferInProgress() {
+        return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
     }
 
     /**
@@ -602,10 +628,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             RaftActorBehavior behavior = currentBehavior.getDelegate();
             if(behavior instanceof Follower) {
                 String previousLeaderId = ((Follower)behavior).getLeaderId();
+                short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
 
                 LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId);
 
-                changeCurrentBehavior(new Follower(context, previousLeaderId));
+                changeCurrentBehavior(new Follower(context, previousLeaderId, previousLeaderPayloadVersion));
             } else {
                 initializeBehavior();
             }
@@ -718,8 +745,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     /**
      * This method is called prior to operations such as leadership transfer and actor shutdown when the leader
      * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
-     * work prior to performing the operation. On completion of any work, the run method must be called to
-     * proceed with the given operation.
+     * work prior to performing the operation. On completion of any work, the run method must be called on the
+     * given Runnable to proceed with the given operation. <b>Important:</b> the run method must be called on
+     * this actor's thread dispatcher as as it modifies internal state.
      * <p>
      * The default implementation immediately runs the operation.
      *
@@ -849,7 +877,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         @Override
         public RaftActorBehavior get() {
             if(this.message instanceof SwitchBehavior){
-                return ((SwitchBehavior) message).getNewState().createBehavior(getRaftActorContext());
+                return AbstractRaftActorBehavior.createBehavior(context, ((SwitchBehavior) message).getNewState());
             }
             return currentBehavior.handleMessage(sender, message);
         }