X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=fd6eb17384448bbef1b0885fd0b99d7b4997f0d1;hb=ed0c0135e2563fbbfcec41975338cece15c62cc2;hp=1c057d755369d50a2147cf7c3e2922783ae56fc8;hpb=013a6679470bf692753f2e04ab4398c97fd9f5d0;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 1c057d7553..fd6eb17384 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; +import akka.actor.Status; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -49,6 +50,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; @@ -119,8 +121,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private RaftActorServerConfigurationSupport serverConfigurationSupport; - private RaftActorLeadershipTransferCohort leadershipTransferInProgress; - private boolean shuttingDown; protected RaftActor(String id, Map peerAddresses, @@ -263,12 +263,53 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof Runnable) { ((Runnable)message).run(); } else if (message instanceof NoopPayload) { - persistData(null, null, (NoopPayload)message, false); + persistData(null, null, (NoopPayload) message, false); + } else if (message instanceof RequestLeadership) { + onRequestLeadership((RequestLeadership) message); } else if (!possiblyHandleBehaviorMessage(message)) { handleNonRaftCommand(message); } } + private void onRequestLeadership(final RequestLeadership message) { + LOG.debug("{}: onRequestLeadership {}", persistenceId(), message); + if (!isLeader()) { + // non-leader cannot satisfy leadership request + LOG.warn("{}: onRequestLeadership {} was sent to non-leader." + + " Current behavior: {}. Sending failure response", + persistenceId(), getCurrentBehavior().state()); + message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to " + + message.getRequestedFollowerId() + + ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf()); + return; + } + + final String requestedFollowerId = message.getRequestedFollowerId(); + final ActorRef replyTo = message.getReplyTo(); + initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() { + @Override + public void onSuccess(final ActorRef raftActorRef) { + // sanity check + if (!requestedFollowerId.equals(getLeaderId())) { + onFailure(raftActorRef); + } + + LOG.debug("{}: Leadership transferred successfully to {}", persistenceId(), requestedFollowerId); + replyTo.tell(new Status.Success(null), getSelf()); + } + + @Override + public void onFailure(final ActorRef raftActorRef) { + LOG.debug("{}: LeadershipTransfer request from {} failed", persistenceId(), requestedFollowerId); + replyTo.tell(new Status.Failure( + new LeadershipTransferFailedException( + "Failed to transfer leadership to " + requestedFollowerId + + ". Follower is not ready to become leader")), + getSelf()); + } + }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT); + } + private boolean possiblyHandleBehaviorMessage(final Object message) { final RaftActorBehavior currentBehavior = getCurrentBehavior(); final BehaviorState state = behaviorStateTracker.capture(currentBehavior); @@ -285,25 +326,31 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return false; } - private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) { + private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete, + @Nullable final String followerId, long newLeaderTimeoutInMillis) { LOG.debug("{}: Initiating leader transfer", persistenceId()); + RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort(); if (leadershipTransferInProgress == null) { - leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this); + leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId); + leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis); leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() { @Override public void onSuccess(ActorRef raftActorRef) { - leadershipTransferInProgress = null; + context.setRaftActorLeadershipTransferCohort(null); } @Override public void onFailure(ActorRef raftActorRef) { - leadershipTransferInProgress = null; + context.setRaftActorLeadershipTransferCohort(null); } }); leadershipTransferInProgress.addOnComplete(onComplete); + + context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress); leadershipTransferInProgress.init(); + } else { LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId()); leadershipTransferInProgress.addOnComplete(onComplete); @@ -320,10 +367,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { shuttingDown = true; final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - if (currentBehavior.state() != RaftState.Leader) { - // For non-leaders shutdown is a no-op - self().tell(PoisonPill.getInstance(), self()); - return; + switch (currentBehavior.state()) { + case Leader: + case PreLeader: + // Fall-through to more work + break; + default: + // For non-leaders shutdown is a no-op + self().tell(PoisonPill.getInstance(), self()); + return; } if (context.hasFollowers()) { @@ -339,7 +391,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId()); raftActorRef.tell(PoisonPill.getInstance(), raftActorRef); } - }); + }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS)); } else { pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) { @Override @@ -399,7 +451,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - OnDemandRaftState.AbstractBuilder builder = newOnDemandRaftStateBuilder() + OnDemandRaftState.AbstractBuilder builder = newOnDemandRaftStateBuilder() .commitIndex(context.getCommitIndex()) .currentTerm(context.getTermInformation().getCurrentTerm()) .inMemoryJournalDataSize(replicatedLog().dataSize()) @@ -443,7 +495,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } - protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { + protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { return OnDemandRaftState.builder(); } @@ -469,6 +521,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId()); + RaftActorLeadershipTransferCohort leadershipTransferInProgress = + context.getRaftActorLeadershipTransferCohort(); if (leadershipTransferInProgress != null) { leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId()); } @@ -606,7 +660,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { && !shuttingDown && !isLeadershipTransferInProgress(); } - private boolean isLeadershipTransferInProgress() { + protected boolean isLeadershipTransferInProgress() { + RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort(); return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring(); } @@ -695,7 +750,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { captureSnapshot(); } } else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) { - setPersistence(new NonPersistentDataProvider() { + setPersistence(new NonPersistentDataProvider(this) { /** * The way snapshotting works is, *
    @@ -801,6 +856,17 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { operation.run(); } + /** + * This method is invoked when the actions hooked to the leader becoming paused failed to execute and the leader + * should resume normal operations. + * + *

    + * Note this method can be invoked even before the operation supplied to {@link #pauseLeader(Runnable)} is invoked. + */ + protected void unpauseLeader() { + + } + protected void onLeaderChanged(String oldLeader, String newLeader) { } @@ -860,7 +926,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { initializeBehavior(); } } - }); + }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT); } }