X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=7ec9f2743c7b6472af4e6ccf521e7c5b4f030e02;hp=1c057d755369d50a2147cf7c3e2922783ae56fc8;hb=c50fd31278f5f24ed487a6f6021f6065bc2fe93f;hpb=013a6679470bf692753f2e04ab4398c97fd9f5d0 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 1c057d7553..7ec9f2743c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; +import akka.actor.Status; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -49,6 +50,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; @@ -119,8 +121,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private RaftActorServerConfigurationSupport serverConfigurationSupport; - private RaftActorLeadershipTransferCohort leadershipTransferInProgress; - private boolean shuttingDown; protected RaftActor(String id, Map peerAddresses, @@ -263,12 +263,53 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof Runnable) { ((Runnable)message).run(); } else if (message instanceof NoopPayload) { - persistData(null, null, (NoopPayload)message, false); + persistData(null, null, (NoopPayload) message, false); + } else if (message instanceof RequestLeadership) { + onRequestLeadership((RequestLeadership) message); } else if (!possiblyHandleBehaviorMessage(message)) { handleNonRaftCommand(message); } } + private void onRequestLeadership(final RequestLeadership message) { + LOG.debug("{}: onRequestLeadership {}", persistenceId(), message); + if (!isLeader()) { + // non-leader cannot satisfy leadership request + LOG.warn("{}: onRequestLeadership {} was sent to non-leader." + + " Current behavior: {}. Sending failure response", + persistenceId(), getCurrentBehavior().state()); + message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to " + + message.getRequestedFollowerId() + + ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf()); + return; + } + + final String requestedFollowerId = message.getRequestedFollowerId(); + final ActorRef replyTo = message.getReplyTo(); + initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() { + @Override + public void onSuccess(final ActorRef raftActorRef) { + // sanity check + if (!requestedFollowerId.equals(getLeaderId())) { + onFailure(raftActorRef); + } + + LOG.debug("{}: Leadership transferred successfully to {}", persistenceId(), requestedFollowerId); + replyTo.tell(new Status.Success(null), getSelf()); + } + + @Override + public void onFailure(final ActorRef raftActorRef) { + LOG.debug("{}: LeadershipTransfer request from {} failed", persistenceId(), requestedFollowerId); + replyTo.tell(new Status.Failure( + new LeadershipTransferFailedException( + "Failed to transfer leadership to " + requestedFollowerId + + ". Follower is not ready to become leader")), + getSelf()); + } + }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT); + } + private boolean possiblyHandleBehaviorMessage(final Object message) { final RaftActorBehavior currentBehavior = getCurrentBehavior(); final BehaviorState state = behaviorStateTracker.capture(currentBehavior); @@ -285,25 +326,31 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return false; } - private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) { + private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete, + @Nullable final String followerId, long newLeaderTimeoutInMillis) { LOG.debug("{}: Initiating leader transfer", persistenceId()); + RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort(); if (leadershipTransferInProgress == null) { - leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this); + leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId); + leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis); leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() { @Override public void onSuccess(ActorRef raftActorRef) { - leadershipTransferInProgress = null; + context.setRaftActorLeadershipTransferCohort(null); } @Override public void onFailure(ActorRef raftActorRef) { - leadershipTransferInProgress = null; + context.setRaftActorLeadershipTransferCohort(null); } }); leadershipTransferInProgress.addOnComplete(onComplete); + + context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress); leadershipTransferInProgress.init(); + } else { LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId()); leadershipTransferInProgress.addOnComplete(onComplete); @@ -320,10 +367,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { shuttingDown = true; final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - if (currentBehavior.state() != RaftState.Leader) { - // For non-leaders shutdown is a no-op - self().tell(PoisonPill.getInstance(), self()); - return; + switch (currentBehavior.state()) { + case Leader: + case PreLeader: + // Fall-through to more work + break; + default: + // For non-leaders shutdown is a no-op + self().tell(PoisonPill.getInstance(), self()); + return; } if (context.hasFollowers()) { @@ -339,7 +391,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId()); raftActorRef.tell(PoisonPill.getInstance(), raftActorRef); } - }); + }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS)); } else { pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) { @Override @@ -469,6 +521,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId()); + RaftActorLeadershipTransferCohort leadershipTransferInProgress = + context.getRaftActorLeadershipTransferCohort(); if (leadershipTransferInProgress != null) { leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId()); } @@ -606,7 +660,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { && !shuttingDown && !isLeadershipTransferInProgress(); } - private boolean isLeadershipTransferInProgress() { + protected boolean isLeadershipTransferInProgress() { + RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort(); return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring(); } @@ -860,7 +915,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { initializeBehavior(); } } - }); + }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT); } }