X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorLeadershipTransferCohort.java;h=85980e2ca371fe752d1743cc0f3aab6df186628a;hp=623fa4902ca6fc45ce5d86630ef620afde3b9c10;hb=92cbb07ef81943b0740ba7c8915001ac6785f560;hpb=4e000b89c3b5ac555cb1e2c39e999a8633b48a96 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java index 623fa4902c..85980e2ca3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java @@ -48,19 +48,18 @@ import scala.concurrent.duration.FiniteDuration; * * @author Thomas Pantelis */ -public class RaftActorLeadershipTransferCohort implements Runnable { +public class RaftActorLeadershipTransferCohort { private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class); private final RaftActor raftActor; - private final ActorRef replyTo; private Cancellable newLeaderTimer; private final List onCompleteCallbacks = new ArrayList<>(); private long newLeaderTimeoutInMillis = 2000; private final Stopwatch transferTimer = Stopwatch.createUnstarted(); + private boolean isTransferring; - RaftActorLeadershipTransferCohort(RaftActor raftActor, ActorRef replyTo) { + RaftActorLeadershipTransferCohort(RaftActor raftActor) { this.raftActor = raftActor; - this.replyTo = replyTo; } void init() { @@ -75,25 +74,36 @@ public class RaftActorLeadershipTransferCohort implements Runnable { currentBehavior.getLeaderPayloadVersion()), raftActor.self()); } - LeaderTransitioning leaderTransitioning = new LeaderTransitioning(); for(String peerId: context.getPeerIds()) { ActorSelection followerActor = context.getPeerActorSelection(peerId); if(followerActor != null) { - followerActor.tell(leaderTransitioning, context.getActor()); + followerActor.tell(LeaderTransitioning.INSTANCE, context.getActor()); } } - raftActor.pauseLeader(this); + raftActor.pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), raftActor) { + @Override + protected void doRun() { + doTransfer(); + } + + @Override + protected void doCancel() { + LOG.debug("{}: pauseLeader timed out - aborting transfer", raftActor.persistenceId()); + abortTransfer(); + } + }); } /** - * This method is invoked to run the leadership transfer. + * This method is invoked to perform the leadership transfer. */ - @Override - public void run() { + @VisibleForTesting + void doTransfer() { RaftActorBehavior behavior = raftActor.getCurrentBehavior(); // Sanity check... if(behavior instanceof Leader) { + isTransferring = true; ((Leader)behavior).transferLeadership(this); } else { LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId()); @@ -123,7 +133,7 @@ public class RaftActorLeadershipTransferCohort implements Runnable { // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it - // safely run on actor's thread dispatcher. + // safely run on the actor's thread dispatcher. FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS); newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(), new Runnable() { @@ -144,22 +154,22 @@ public class RaftActorLeadershipTransferCohort implements Runnable { } private void finish(boolean success) { + isTransferring = false; if(transferTimer.isRunning()) { transferTimer.stop(); if(success) { LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(), - raftActor.getLeaderId(), transferTimer.toString()); + raftActor.getLeaderId(), transferTimer); } else { - LOG.info("{}: Failed to transfer leadership in {}", raftActor.persistenceId(), - transferTimer.toString()); + LOG.warn("{}: Failed to transfer leadership in {}", raftActor.persistenceId(), transferTimer); } } for(OnComplete onComplete: onCompleteCallbacks) { if(success) { - onComplete.onSuccess(raftActor.self(), replyTo); + onComplete.onSuccess(raftActor.self()); } else { - onComplete.onFailure(raftActor.self(), replyTo); + onComplete.onFailure(raftActor.self()); } } } @@ -168,13 +178,17 @@ public class RaftActorLeadershipTransferCohort implements Runnable { onCompleteCallbacks.add(onComplete); } + boolean isTransferring() { + return isTransferring; + } + @VisibleForTesting void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) { this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis; } interface OnComplete { - void onSuccess(ActorRef raftActorRef, ActorRef replyTo); - void onFailure(ActorRef raftActorRef, ActorRef replyTo); + void onSuccess(ActorRef raftActorRef); + void onFailure(ActorRef raftActorRef); } }