Measure follower activity in nanoseconds
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
old mode 100644 (file)
new mode 100755 (executable)
index 70a0b86..f9099a7
@@ -121,12 +121,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private RaftActorServerConfigurationSupport serverConfigurationSupport;
 
-    private RaftActorLeadershipTransferCohort leadershipTransferInProgress;
-
     private boolean shuttingDown;
 
-    protected RaftActor(String id, Map<String, String> peerAddresses,
-         Optional<ConfigParams> configParams, short payloadVersion) {
+    protected RaftActor(final String id, final Map<String, String> peerAddresses,
+         final Optional<ConfigParams> configParams, final short payloadVersion) {
 
         persistentProvider = new PersistentDataProvider(this);
         delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
@@ -159,7 +157,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     @Override
-    protected void handleRecover(Object message) {
+    protected void handleRecover(final Object message) {
         if (raftRecovery == null) {
             raftRecovery = newRaftActorRecoverySupport();
         }
@@ -185,7 +183,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @VisibleForTesting
     @SuppressWarnings("checkstyle:IllegalCatch")
-    protected void changeCurrentBehavior(RaftActorBehavior newBehavior) {
+    protected void changeCurrentBehavior(final RaftActorBehavior newBehavior) {
         final RaftActorBehavior currentBehavior = getCurrentBehavior();
         if (currentBehavior != null) {
             try {
@@ -309,7 +307,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                                         + ". Follower is not ready to become leader")),
                         getSelf());
             }
-        }, message.getRequestedFollowerId());
+        }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
     }
 
     private boolean possiblyHandleBehaviorMessage(final Object message) {
@@ -328,30 +326,31 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return false;
     }
 
-    private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
-        initiateLeadershipTransfer(onComplete, null);
-    }
-
     private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
-                                            final String followerId) {
+            @Nullable final String followerId, final long newLeaderTimeoutInMillis) {
         LOG.debug("{}: Initiating leader transfer", persistenceId());
 
+        RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
         if (leadershipTransferInProgress == null) {
             leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
+            leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis);
             leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
                 @Override
-                public void onSuccess(ActorRef raftActorRef) {
-                    leadershipTransferInProgress = null;
+                public void onSuccess(final ActorRef raftActorRef) {
+                    context.setRaftActorLeadershipTransferCohort(null);
                 }
 
                 @Override
-                public void onFailure(ActorRef raftActorRef) {
-                    leadershipTransferInProgress = null;
+                public void onFailure(final ActorRef raftActorRef) {
+                    context.setRaftActorLeadershipTransferCohort(null);
                 }
             });
 
             leadershipTransferInProgress.addOnComplete(onComplete);
+
+            context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress);
             leadershipTransferInProgress.init();
+
         } else {
             LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
             leadershipTransferInProgress.addOnComplete(onComplete);
@@ -368,26 +367,31 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         shuttingDown = true;
 
         final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
-        if (currentBehavior.state() != RaftState.Leader) {
-            // For non-leaders shutdown is a no-op
-            self().tell(PoisonPill.getInstance(), self());
-            return;
+        switch (currentBehavior.state()) {
+            case Leader:
+            case PreLeader:
+                // Fall-through to more work
+                break;
+            default:
+                // For non-leaders shutdown is a no-op
+                self().tell(PoisonPill.getInstance(), self());
+                return;
         }
 
         if (context.hasFollowers()) {
             initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
                 @Override
-                public void onSuccess(ActorRef raftActorRef) {
+                public void onSuccess(final ActorRef raftActorRef) {
                     LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId());
                     raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
                 }
 
                 @Override
-                public void onFailure(ActorRef raftActorRef) {
+                public void onFailure(final ActorRef raftActorRef) {
                     LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
                     raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
                 }
-            });
+            }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS));
         } else {
             pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
                 @Override
@@ -413,13 +417,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    private void switchBehavior(SwitchBehavior message) {
+    private void switchBehavior(final SwitchBehavior message) {
         if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
             RaftState newState = message.getNewState();
             if (newState == RaftState.Leader || newState == RaftState.Follower) {
+                getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
                 switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()),
                     AbstractRaftActorBehavior.createBehavior(context, message.getNewState()));
-                getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
             } else {
                 LOG.warn("Switching to behavior : {} - not supported", newState);
             }
@@ -447,7 +451,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
 
         final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
-        OnDemandRaftState.AbstractBuilder<?> builder = newOnDemandRaftStateBuilder()
+        OnDemandRaftState.AbstractBuilder<?, ?> builder = newOnDemandRaftStateBuilder()
                 .commitIndex(context.getCommitIndex())
                 .currentTerm(context.getTermInformation().getCurrentTerm())
                 .inMemoryJournalDataSize(replicatedLog().dataSize())
@@ -480,7 +484,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             for (String id: followerIds) {
                 final FollowerLogInformation info = leader.getFollower(id);
                 followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
-                        info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()),
+                        info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(
+                            TimeUnit.NANOSECONDS.toMillis(info.nanosSinceLastActivity())),
                         context.getPeerInfo(info.getId()).isVoting()));
             }
 
@@ -491,11 +496,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     }
 
-    protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+    protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
         return OnDemandRaftState.builder();
     }
 
-    private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) {
+    private void handleBehaviorChange(final BehaviorState oldBehaviorState, final RaftActorBehavior currentBehavior) {
         RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
 
         if (oldBehavior != currentBehavior) {
@@ -517,6 +522,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
             onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
 
+            RaftActorLeadershipTransferCohort leadershipTransferInProgress =
+                    context.getRaftActorLeadershipTransferCohort();
             if (leadershipTransferInProgress != null) {
                 leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
             }
@@ -531,7 +538,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    private void handleApplyState(ApplyState applyState) {
+    private void handleApplyState(final ApplyState applyState) {
         long startTime = System.nanoTime();
 
         Payload payload = applyState.getReplicatedLogEntry().getData();
@@ -554,7 +561,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         self().tell(applyState, self());
     }
 
-    protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) {
+    protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
+            final short leaderPayloadVersion) {
         return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
     }
 
@@ -631,7 +639,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     @VisibleForTesting
-    void setCurrentBehavior(RaftActorBehavior behavior) {
+    void setCurrentBehavior(final RaftActorBehavior behavior) {
         context.setCurrentBehavior(behavior);
     }
 
@@ -654,7 +662,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 && !shuttingDown && !isLeadershipTransferInProgress();
     }
 
-    private boolean isLeadershipTransferInProgress() {
+    protected boolean isLeadershipTransferInProgress() {
+        RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
         return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
     }
 
@@ -697,7 +706,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return context;
     }
 
-    protected void updateConfigParams(ConfigParams configParams) {
+    protected void updateConfigParams(final ConfigParams configParams) {
 
         // obtain the RaftPolicy for oldConfigParams and the updated one.
         String oldRaftPolicy = context.getConfigParams().getCustomRaftPolicyImplementationClass();
@@ -729,11 +738,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return delegatingPersistenceProvider.getDelegate();
     }
 
-    public void setPersistence(DataPersistenceProvider provider) {
+    public void setPersistence(final DataPersistenceProvider provider) {
         delegatingPersistenceProvider.setDelegate(provider);
     }
 
-    protected void setPersistence(boolean persistent) {
+    protected void setPersistence(final boolean persistent) {
         DataPersistenceProvider currentPersistence = persistence();
         if (persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) {
             setPersistence(new PersistentDataProvider(this));
@@ -743,8 +752,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 captureSnapshot();
             }
         } else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
-            setPersistence(new NonPersistentDataProvider() {
-                /**
+            setPersistence(new NonPersistentDataProvider(this) {
+                /*
                  * The way snapshotting works is,
                  * <ol>
                  * <li> RaftActor calls createSnapshot on the Shard
@@ -756,7 +765,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                  * </ol>
                  */
                 @Override
-                public void saveSnapshot(Object object) {
+                public void saveSnapshot(final Object object) {
                     // Make saving Snapshot successful
                     // Committing the snapshot here would end up calling commit in the creating state which would
                     // be a state violation. That's why now we send a message to commit the snapshot.
@@ -779,7 +788,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * Note that if the peerId does not match the list of peers passed to
      * this actor during construction an IllegalStateException will be thrown.
      */
-    protected void setPeerAddress(String peerId, String peerAddress) {
+    protected void setPeerAddress(final String peerId, final String peerAddress) {
         context.setPeerAddress(peerId, peerAddress);
     }
 
@@ -833,6 +842,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      */
     protected abstract Optional<ActorRef> getRoleChangeNotifier();
 
+    /**
+     * This method is called on the leader when a voting change operation completes.
+     */
+    protected void onVotingStateChangeComplete() {
+    }
+
     /**
      * This method is called prior to operations such as leadership transfer and actor shutdown when the leader
      * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
@@ -845,11 +860,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      *
      * @param operation the operation to run
      */
-    protected void pauseLeader(Runnable operation) {
+    protected void pauseLeader(final Runnable operation) {
         operation.run();
     }
 
-    protected void onLeaderChanged(String oldLeader, String newLeader) {
+    /**
+     * This method is invoked when the actions hooked to the leader becoming paused failed to execute and the leader
+     * should resume normal operations.
+     *
+     * <p>
+     * Note this method can be invoked even before the operation supplied to {@link #pauseLeader(Runnable)} is invoked.
+     */
+    protected void unpauseLeader() {
+
+    }
+
+    protected void onLeaderChanged(final String oldLeader, final String newLeader) {
     }
 
     private String getLeaderAddress() {
@@ -890,13 +916,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         if (isLeader()) {
             initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
                 @Override
-                public void onSuccess(ActorRef raftActorRef) {
+                public void onSuccess(final ActorRef raftActorRef) {
                     LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId());
                     ensureFollowerState();
                 }
 
                 @Override
-                public void onFailure(ActorRef raftActorRef) {
+                public void onFailure(final ActorRef raftActorRef) {
                     LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId());
                     ensureFollowerState();
                 }
@@ -908,7 +934,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                         initializeBehavior();
                     }
                 }
-            });
+            }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
         }
     }
 
@@ -1013,5 +1039,4 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             return new SimpleBehaviorState(lastValidLeaderId, lastLeaderId, behavior);
         }
     }
-
 }