BUG 8618: Log leader status when rejecting request
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 46551506e337182df1c0f7a05e2a6fe1e11043ab..9d970f1695ddb422c9a364428f5f9214387282de 100644 (file)
@@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.raft;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
+import akka.actor.Status;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -49,8 +50,11 @@ import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.Immutable;
@@ -117,8 +121,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private RaftActorServerConfigurationSupport serverConfigurationSupport;
 
-    private RaftActorLeadershipTransferCohort leadershipTransferInProgress;
-
     private boolean shuttingDown;
 
     protected RaftActor(String id, Map<String, String> peerAddresses,
@@ -131,7 +133,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
             -1, -1, peerAddresses,
             configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
-            delegatingPersistenceProvider, LOG);
+            delegatingPersistenceProvider, this::handleApplyState, LOG);
 
         context.setPayloadVersion(payloadVersion);
         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
@@ -223,29 +225,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         if (snapshotSupport.handleSnapshotMessage(message, getSender())) {
             return;
         }
-
         if (message instanceof ApplyState) {
             ApplyState applyState = (ApplyState) message;
 
-            long startTime = System.nanoTime();
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("{}: Applying state for log index {} data {}",
-                    persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
-                    applyState.getReplicatedLogEntry().getData());
-            }
-
-            if (!(applyState.getReplicatedLogEntry().getData() instanceof NoopPayload)) {
-                applyState(applyState.getClientActor(), applyState.getIdentifier(),
-                    applyState.getReplicatedLogEntry().getData());
-            }
-
-            long elapsedTime = System.nanoTime() - startTime;
-            if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
-                LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
-                        TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
-            }
-
             if (!hasFollowers()) {
                 // for single node, the capture should happen after the apply state
                 // as we delete messages from the persistent journal which have made it to the snapshot
@@ -256,9 +238,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 context.getSnapshotManager().trimLog(context.getLastApplied());
             }
 
-            // Send it to the current behavior - some behaviors like PreLeader need to be notified of ApplyState.
             possiblyHandleBehaviorMessage(message);
-
         } else if (message instanceof ApplyJournalEntries) {
             ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
             LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
@@ -277,18 +257,59 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof SwitchBehavior) {
             switchBehavior((SwitchBehavior) message);
         } else if (message instanceof LeaderTransitioning) {
-            onLeaderTransitioning();
+            onLeaderTransitioning((LeaderTransitioning)message);
         } else if (message instanceof Shutdown) {
             onShutDown();
         } else if (message instanceof Runnable) {
             ((Runnable)message).run();
         } else if (message instanceof NoopPayload) {
-            persistData(null, null, (NoopPayload)message);
+            persistData(null, null, (NoopPayload) message, false);
+        } else if (message instanceof RequestLeadership) {
+            onRequestLeadership((RequestLeadership) message);
         } else if (!possiblyHandleBehaviorMessage(message)) {
             handleNonRaftCommand(message);
         }
     }
 
+    private void onRequestLeadership(final RequestLeadership message) {
+        LOG.debug("{}: onRequestLeadership {}", persistenceId(), message);
+        if (!isLeader()) {
+            // non-leader cannot satisfy leadership request
+            LOG.warn("{}: onRequestLeadership {} was sent to non-leader."
+                    + " Current behavior: {}. Sending failure response",
+                    persistenceId(), getCurrentBehavior().state());
+            message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to "
+                    + message.getRequestedFollowerId()
+                    + ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf());
+            return;
+        }
+
+        final String requestedFollowerId = message.getRequestedFollowerId();
+        final ActorRef replyTo = message.getReplyTo();
+        initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+            @Override
+            public void onSuccess(final ActorRef raftActorRef) {
+                // sanity check
+                if (!requestedFollowerId.equals(getLeaderId())) {
+                    onFailure(raftActorRef);
+                }
+
+                LOG.debug("{}: Leadership transferred successfully to {}", persistenceId(), requestedFollowerId);
+                replyTo.tell(new Status.Success(null), getSelf());
+            }
+
+            @Override
+            public void onFailure(final ActorRef raftActorRef) {
+                LOG.debug("{}: LeadershipTransfer request from {} failed", persistenceId(), requestedFollowerId);
+                replyTo.tell(new Status.Failure(
+                        new LeadershipTransferFailedException(
+                                "Failed to transfer leadership to " + requestedFollowerId
+                                        + ". Follower is not ready to become leader")),
+                        getSelf());
+            }
+        }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
+    }
+
     private boolean possiblyHandleBehaviorMessage(final Object message) {
         final RaftActorBehavior currentBehavior = getCurrentBehavior();
         final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
@@ -305,25 +326,31 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return false;
     }
 
-    private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
+    private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
+            @Nullable final String followerId, long newLeaderTimeoutInMillis) {
         LOG.debug("{}: Initiating leader transfer", persistenceId());
 
+        RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
         if (leadershipTransferInProgress == null) {
-            leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this);
+            leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
+            leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis);
             leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
                 @Override
                 public void onSuccess(ActorRef raftActorRef) {
-                    leadershipTransferInProgress = null;
+                    context.setRaftActorLeadershipTransferCohort(null);
                 }
 
                 @Override
                 public void onFailure(ActorRef raftActorRef) {
-                    leadershipTransferInProgress = null;
+                    context.setRaftActorLeadershipTransferCohort(null);
                 }
             });
 
             leadershipTransferInProgress.addOnComplete(onComplete);
+
+            context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress);
             leadershipTransferInProgress.init();
+
         } else {
             LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
             leadershipTransferInProgress.addOnComplete(onComplete);
@@ -359,7 +386,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
                     raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
                 }
-            });
+            }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS));
         } else {
             pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
                 @Override
@@ -375,10 +402,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    private void onLeaderTransitioning() {
-        LOG.debug("{}: onLeaderTransitioning", persistenceId());
+    private void onLeaderTransitioning(final LeaderTransitioning leaderTransitioning) {
+        LOG.debug("{}: onLeaderTransitioning: {}", persistenceId(), leaderTransitioning);
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
-        if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) {
+        if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()
+                && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) {
             roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
                 getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
         }
@@ -387,7 +415,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void switchBehavior(SwitchBehavior message) {
         if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
             RaftState newState = message.getNewState();
-            if ( newState == RaftState.Leader || newState == RaftState.Follower) {
+            if (newState == RaftState.Leader || newState == RaftState.Follower) {
                 switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()),
                     AbstractRaftActorBehavior.createBehavior(context, message.getNewState()));
                 getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
@@ -418,7 +446,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
 
         final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
-        OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
+        OnDemandRaftState.AbstractBuilder<?> builder = newOnDemandRaftStateBuilder()
                 .commitIndex(context.getCommitIndex())
                 .currentTerm(context.getTermInformation().getCurrentTerm())
                 .inMemoryJournalDataSize(replicatedLog().dataSize())
@@ -462,6 +490,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     }
 
+    protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+        return OnDemandRaftState.builder();
+    }
+
     private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) {
         RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
 
@@ -484,6 +516,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
             onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
 
+            RaftActorLeadershipTransferCohort leadershipTransferInProgress =
+                    context.getRaftActorLeadershipTransferCohort();
             if (leadershipTransferInProgress != null) {
                 leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
             }
@@ -498,6 +532,29 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
+    private void handleApplyState(ApplyState applyState) {
+        long startTime = System.nanoTime();
+
+        Payload payload = applyState.getReplicatedLogEntry().getData();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{}: Applying state for log index {} data {}",
+                persistenceId(), applyState.getReplicatedLogEntry().getIndex(), payload);
+        }
+
+        if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) {
+            applyState(applyState.getClientActor(), applyState.getIdentifier(), payload);
+        }
+
+        long elapsedTime = System.nanoTime() - startTime;
+        if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
+            LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
+                    TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
+        }
+
+        // Send the ApplyState message back to self to handle further processing asynchronously.
+        self().tell(applyState, self());
+    }
+
     protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) {
         return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
     }
@@ -513,12 +570,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     /**
-     * When a derived RaftActor needs to persist something it must call
-     * persistData.
+     * Persists the given Payload in the journal and replicates to any followers. After successful completion,
+     * {@link #applyState(ActorRef, Identifier, Object)} is notified.
+     *
+     * @param clientActor optional ActorRef that is provided via the applyState callback
+     * @param identifier the payload identifier
+     * @param data the payload data to persist
+     * @param batchHint if true, an attempt is made to delay immediate replication and batch the payload with
+     *        subsequent payloads for efficiency. Otherwise the payload is immediately replicated.
      */
-    protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) {
-
-        ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
+    protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data,
+            final boolean batchHint) {
+        ReplicatedLogEntry replicatedLogEntry = new SimpleReplicatedLogEntry(
             context.getReplicatedLog().lastIndex() + 1,
             context.getTermInformation().getCurrentTerm(), data);
         replicatedLogEntry.setPersistencePending(true);
@@ -537,7 +600,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 raftContext.setLastApplied(persistedLogEntry.getIndex());
 
                 // Apply the state immediately.
-                self().tell(new ApplyState(clientActor, identifier, persistedLogEntry), self());
+                handleApplyState(new ApplyState(clientActor, identifier, persistedLogEntry));
 
                 // Send a ApplyJournalEntries message so that we write the fact that we applied
                 // the state to durable storage
@@ -555,7 +618,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         if (wasAppended && hasFollowers()) {
             // Send log entry for replication.
-            getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry));
+            getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry,
+                    !batchHint));
         }
     }
 
@@ -591,7 +655,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 && !shuttingDown && !isLeadershipTransferInProgress();
     }
 
-    private boolean isLeadershipTransferInProgress() {
+    protected boolean isLeadershipTransferInProgress() {
+        RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
         return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
     }
 
@@ -751,7 +816,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected abstract void onRecoveryComplete();
 
     /**
-     * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
+     * Returns the RaftActorSnapshotCohort to participate in snapshot captures.
      */
     @Nonnull
     protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
@@ -845,7 +910,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                         initializeBehavior();
                     }
                 }
-            });
+            }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
         }
     }