Use java.util.function.Supplier instead of Guava
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index a36807a8c8a4bb46cc0e7743f8daa1079615e605..8d5536f9245b29335c51e6d94e5a87f755ad99d9 100644 (file)
@@ -16,7 +16,6 @@ import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
 import java.io.Serializable;
 import java.util.Collection;
@@ -24,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import javax.annotation.Nonnull;
 import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
@@ -41,7 +41,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
-import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
@@ -100,12 +99,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
-    /**
-     * The current state determines the current behavior of a RaftActor
-     * A Raft Actor always starts off in the Follower State
-     */
-    private final DelegatingRaftActorBehavior currentBehavior = new DelegatingRaftActorBehavior();
-
     /**
      * This context should NOT be passed directly to any other actor it is
      * only to be consumed by the RaftActorBehaviors
@@ -143,7 +136,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             delegatingPersistenceProvider, LOG);
 
         context.setPayloadVersion(payloadVersion);
-        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, currentBehavior));
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
     }
 
     @Override
@@ -159,14 +152,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @Override
     public void postStop() {
-        if(currentBehavior.getDelegate() != null) {
-            try {
-                currentBehavior.close();
-            } catch (Exception e) {
-                LOG.debug("{}: Error closing behavior {}", persistenceId(), currentBehavior.state());
-            }
-        }
-
+        context.close();
         super.postStop();
     }
 
@@ -194,13 +180,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected RaftActorRecoverySupport newRaftActorRecoverySupport() {
-        return new RaftActorRecoverySupport(context, currentBehavior, getRaftActorRecoveryCohort());
+        return new RaftActorRecoverySupport(context, getRaftActorRecoveryCohort());
     }
 
-    protected void initializeBehavior(){
+    @VisibleForTesting
+    void initializeBehavior(){
         changeCurrentBehavior(new Follower(context));
     }
 
+    @VisibleForTesting
     protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
         if(getCurrentBehavior() != null) {
             try {
@@ -249,7 +237,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 // and recovery shows data missing
                 context.getReplicatedLog().captureSnapshotIfReady(applyState.getReplicatedLogEntry());
 
-                context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+                context.getSnapshotManager().trimLog(context.getLastApplied());
             }
 
         } else if (message instanceof ApplyJournalEntries) {
@@ -282,19 +270,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
+    private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
         LOG.debug("{}: Initiating leader transfer", persistenceId());
 
         if(leadershipTransferInProgress == null) {
-            leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, getSender());
+            leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this);
             leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
                 @Override
-                public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
+                public void onSuccess(ActorRef raftActorRef) {
                     leadershipTransferInProgress = null;
                 }
 
                 @Override
-                public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
+                public void onFailure(ActorRef raftActorRef) {
                     leadershipTransferInProgress = null;
                 }
             });
@@ -315,21 +303,29 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
 
         shuttingDown = true;
-        if(currentBehavior.state() == RaftState.Leader && context.hasFollowers()) {
+
+        final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
+        if (currentBehavior.state() != RaftState.Leader) {
+            // For non-leaders shutdown is a no-op
+            self().tell(PoisonPill.getInstance(), self());
+            return;
+        }
+
+        if (context.hasFollowers()) {
             initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
                 @Override
-                public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
+                public void onSuccess(ActorRef raftActorRef) {
                     LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId());
                     raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
                 }
 
                 @Override
-                public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
+                public void onFailure(ActorRef raftActorRef) {
                     LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
                     raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
                 }
             });
-        } else if(currentBehavior.state() == RaftState.Leader) {
+        } else {
             pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
                 @Override
                 protected void doRun() {
@@ -341,17 +337,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     self().tell(PoisonPill.getInstance(), self());
                 }
             });
-        } else {
-            self().tell(PoisonPill.getInstance(), self());
         }
     }
 
     private void onLeaderTransitioning() {
         LOG.debug("{}: onLeaderTransitioning", persistenceId());
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
-        if(currentBehavior.state() == RaftState.Follower && roleChangeNotifier.isPresent()) {
+        if(getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) {
             roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
-                    currentBehavior.getLeaderPayloadVersion()), getSelf());
+                getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
         }
     }
 
@@ -375,9 +369,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
     }
 
-    protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
-        return new RaftActorSnapshotMessageSupport(context, currentBehavior,
-                getRaftActorSnapshotCohort());
+    @VisibleForTesting
+    RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
+        return new RaftActorSnapshotMessageSupport(context, getRaftActorSnapshotCohort());
     }
 
     private void onGetOnDemandRaftStats() {
@@ -388,6 +382,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             peerAddresses.put(peerId, context.getPeerAddress(peerId));
         }
 
+        final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
         OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
                 .commitIndex(context.getCommitIndex())
                 .currentTerm(context.getTermInformation().getCurrentTerm())
@@ -406,7 +401,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 .peerAddresses(peerAddresses)
                 .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
 
-        ReplicatedLogEntry lastLogEntry = getLastLogEntry();
+        ReplicatedLogEntry lastLogEntry = replicatedLog().last();
         if (lastLogEntry != null) {
             builder.lastLogIndex(lastLogEntry.getIndex());
             builder.lastLogTerm(lastLogEntry.getTerm());
@@ -518,7 +513,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
 
                     // Send message for replication
-                    currentBehavior.handleMessage(getSelf(),
+                    getCurrentBehavior().handleMessage(getSelf(),
                             new Replicate(clientActor, identifier, replicatedLogEntry));
                 }
             }
@@ -535,11 +530,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @VisibleForTesting
     void setCurrentBehavior(RaftActorBehavior behavior) {
-        currentBehavior.setDelegate(behavior);
+        context.setCurrentBehavior(behavior);
     }
 
     protected RaftActorBehavior getCurrentBehavior() {
-        return currentBehavior.getDelegate();
+        return context.getCurrentBehavior();
     }
 
     /**
@@ -549,11 +544,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * @return true it this RaftActor is a Leader false otherwise
      */
     protected boolean isLeader() {
-        return context.getId().equals(currentBehavior.getLeaderId());
+        return context.getId().equals(getCurrentBehavior().getLeaderId());
     }
 
-    protected boolean isLeaderActive() {
-        return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown &&
+    protected final boolean isLeaderActive() {
+        return getRaftState() != RaftState.IsolatedLeader && !shuttingDown &&
                 !isLeadershipTransferInProgress();
     }
 
@@ -582,30 +577,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      *
      * @return the current leader's id
      */
-    protected String getLeaderId(){
-        return currentBehavior.getLeaderId();
-    }
-
-    protected RaftState getRaftState() {
-        return currentBehavior.state();
+    protected final String getLeaderId(){
+        return getCurrentBehavior().getLeaderId();
     }
 
-    protected ReplicatedLogEntry getLastLogEntry() {
-        return replicatedLog().last();
+    @VisibleForTesting
+    protected final RaftState getRaftState() {
+        return getCurrentBehavior().state();
     }
 
     protected Long getCurrentTerm(){
         return context.getTermInformation().getCurrentTerm();
     }
 
-    protected Long getCommitIndex(){
-        return context.getCommitIndex();
-    }
-
-    protected Long getLastApplied(){
-        return context.getLastApplied();
-    }
-
     protected RaftActorContext getRaftActorContext() {
         return context;
     }
@@ -625,7 +609,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             // The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower
             // but transfer the previous leaderId so it doesn't immediately try to schedule an election. This
             // avoids potential disruption. Otherwise, switch to Follower normally.
-            RaftActorBehavior behavior = currentBehavior.getDelegate();
+            RaftActorBehavior behavior = getCurrentBehavior();
             if(behavior instanceof Follower) {
                 String previousLeaderId = ((Follower)behavior).getLeaderId();
                 short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
@@ -757,13 +741,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         operation.run();
     }
 
-    protected void onLeaderChanged(String oldLeader, String newLeader){};
+    protected void onLeaderChanged(String oldLeader, String newLeader) {
+
+    };
 
     private String getLeaderAddress(){
         if(isLeader()){
             return getSelf().path().toString();
         }
-        String leaderId = currentBehavior.getLeaderId();
+        String leaderId = getLeaderId();
         if (leaderId == null) {
             return null;
         }
@@ -783,11 +769,42 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void captureSnapshot() {
         SnapshotManager snapshotManager = context.getSnapshotManager();
 
-        if(!snapshotManager.isCapturing()) {
+        if (!snapshotManager.isCapturing()) {
+            final long idx = getCurrentBehavior().getReplicatedToAllIndex();
             LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
-                replicatedLog().last(), currentBehavior.getReplicatedToAllIndex());
+                replicatedLog().last(), idx);
+
+            snapshotManager.capture(replicatedLog().last(), idx);
+        }
+    }
+
+    /**
+     * Switch this member to non-voting status. This is a no-op for all behaviors except when we are the leader,
+     * in which case we need to step down.
+     */
+    void becomeNonVoting() {
+        if (isLeader()) {
+            initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+                @Override
+                public void onSuccess(ActorRef raftActorRef) {
+                    LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId());
+                    ensureFollowerState();
+                }
 
-            snapshotManager.capture(replicatedLog().last(), currentBehavior.getReplicatedToAllIndex());
+                @Override
+                public void onFailure(ActorRef raftActorRef) {
+                    LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId());
+                    ensureFollowerState();
+                }
+
+                private void ensureFollowerState() {
+                    // Whether or not leadership transfer succeeded, we have to step down as leader and
+                    // switch to Follower so ensure that.
+                    if (getRaftState() != RaftState.Follower) {
+                        initializeBehavior();
+                    }
+                }
+            });
         }
     }
 
@@ -879,7 +896,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             if(this.message instanceof SwitchBehavior){
                 return AbstractRaftActorBehavior.createBehavior(context, ((SwitchBehavior) message).getNewState());
             }
-            return currentBehavior.handleMessage(sender, message);
+            return getCurrentBehavior().handleMessage(sender, message);
         }
     }
 }