X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=6d01752a95e17b4c45b4e62f7b96fc85ee730f3c;hp=70a0b86952a3ea22e04921860ae7a2bd5ef97dc1;hb=de1ed2cb86a3c897d307a4b73a89384465c3ca6f;hpb=0c05dff15e4f36c5ecbd26e82309de21f67c8cd5 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 70a0b86952..6d01752a95 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -121,8 +121,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private RaftActorServerConfigurationSupport serverConfigurationSupport; - private RaftActorLeadershipTransferCohort leadershipTransferInProgress; - private boolean shuttingDown; protected RaftActor(String id, Map peerAddresses, @@ -309,7 +307,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { + ". Follower is not ready to become leader")), getSelf()); } - }, message.getRequestedFollowerId()); + }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT); } private boolean possiblyHandleBehaviorMessage(final Object message) { @@ -328,30 +326,31 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return false; } - private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) { - initiateLeadershipTransfer(onComplete, null); - } - private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete, - final String followerId) { + @Nullable final String followerId, long newLeaderTimeoutInMillis) { LOG.debug("{}: Initiating leader transfer", persistenceId()); + RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort(); if (leadershipTransferInProgress == null) { leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId); + leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis); leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() { @Override public void onSuccess(ActorRef raftActorRef) { - leadershipTransferInProgress = null; + context.setRaftActorLeadershipTransferCohort(null); } @Override public void onFailure(ActorRef raftActorRef) { - leadershipTransferInProgress = null; + context.setRaftActorLeadershipTransferCohort(null); } }); leadershipTransferInProgress.addOnComplete(onComplete); + + context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress); leadershipTransferInProgress.init(); + } else { LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId()); leadershipTransferInProgress.addOnComplete(onComplete); @@ -368,10 +367,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { shuttingDown = true; final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - if (currentBehavior.state() != RaftState.Leader) { - // For non-leaders shutdown is a no-op - self().tell(PoisonPill.getInstance(), self()); - return; + switch (currentBehavior.state()) { + case Leader: + case PreLeader: + // Fall-through to more work + break; + default: + // For non-leaders shutdown is a no-op + self().tell(PoisonPill.getInstance(), self()); + return; } if (context.hasFollowers()) { @@ -387,7 +391,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId()); raftActorRef.tell(PoisonPill.getInstance(), raftActorRef); } - }); + }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS)); } else { pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) { @Override @@ -517,6 +521,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId()); + RaftActorLeadershipTransferCohort leadershipTransferInProgress = + context.getRaftActorLeadershipTransferCohort(); if (leadershipTransferInProgress != null) { leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId()); } @@ -654,7 +660,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { && !shuttingDown && !isLeadershipTransferInProgress(); } - private boolean isLeadershipTransferInProgress() { + protected boolean isLeadershipTransferInProgress() { + RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort(); return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring(); } @@ -849,6 +856,17 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { operation.run(); } + /** + * This method is invoked when the actions hooked to the leader becoming paused failed to execute and the leader + * should resume normal operations. + * + *

+ * Note this method can be invoked even before the operation supplied to {@link #pauseLeader(Runnable)} is invoked. + */ + protected void unpauseLeader() { + + } + protected void onLeaderChanged(String oldLeader, String newLeader) { } @@ -908,7 +926,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { initializeBehavior(); } } - }); + }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT); } }