/* * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.raft.behaviors; import static java.util.Objects.requireNonNull; import akka.actor.ActorRef; import akka.actor.ActorSelection; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import java.util.Optional; import java.util.concurrent.TimeUnit; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; /** * The behavior of a RaftActor when it is in the Leader state. * *

* Leaders: *

*/ public class Leader extends AbstractLeader { /** * Internal message sent to periodically check if this leader has become isolated and should transition * to {@link IsolatedLeader}. */ @VisibleForTesting static final Object ISOLATED_LEADER_CHECK = new Object(); private final Stopwatch isolatedLeaderCheck = Stopwatch.createStarted(); private @Nullable LeadershipTransferContext leadershipTransferContext; Leader(final RaftActorContext context, @Nullable final AbstractLeader initializeFromLeader) { super(context, RaftState.Leader, initializeFromLeader); } public Leader(final RaftActorContext context) { this(context, null); } @Override public RaftActorBehavior handleMessage(final ActorRef sender, final Object originalMessage) { requireNonNull(sender, "sender should not be null"); if (ISOLATED_LEADER_CHECK.equals(originalMessage)) { if (isLeaderIsolated()) { log.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader", context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId()); return internalSwitchBehavior(new IsolatedLeader(context, this)); } else { return this; } } else { return super.handleMessage(sender, originalMessage); } } @Override protected void beforeSendHeartbeat() { if (isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS) > context.getConfigParams().getIsolatedCheckIntervalInMillis()) { context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor()); isolatedLeaderCheck.reset().start(); } if (leadershipTransferContext != null && leadershipTransferContext.isExpired( context.getConfigParams().getElectionTimeOutInterval().toMillis())) { log.debug("{}: Leadership transfer expired", logName()); leadershipTransferContext = null; } } @Override protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender, final AppendEntriesReply appendEntriesReply) { RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply); tryToCompleteLeadershipTransfer(appendEntriesReply.getFollowerId()); return returnBehavior; } /** * Attempts to transfer leadership to a follower as per the raft paper (§3.10) as follows: * * * @param leadershipTransferCohort the cohort participating in the leadership transfer */ public void transferLeadership(@NonNull final RaftActorLeadershipTransferCohort leadershipTransferCohort) { log.debug("{}: Attempting to transfer leadership", logName()); leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort); // Send an immediate heart beat to the followers. sendAppendEntries(0, false); } private void tryToCompleteLeadershipTransfer(final String followerId) { if (leadershipTransferContext == null) { return; } final Optional requestedFollowerIdOptional = leadershipTransferContext.transferCohort.getRequestedFollowerId(); if (requestedFollowerIdOptional.isPresent() && !requestedFollowerIdOptional.orElseThrow().equals(followerId)) { // we want to transfer leadership to specific follower return; } FollowerLogInformation followerInfo = getFollower(followerId); if (followerInfo == null) { return; } long lastIndex = context.getReplicatedLog().lastIndex(); boolean isVoting = context.getPeerInfo(followerId).isVoting(); log.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}, isVoting: {}", logName(), followerId, followerInfo.getMatchIndex(), lastIndex, isVoting); if (isVoting && followerInfo.getMatchIndex() == lastIndex) { log.debug("{}: Follower's log matches - sending ElectionTimeout", logName()); // We can't be sure if the follower has applied all its log entries to its state so send an // additional AppendEntries with the latest commit index. sendAppendEntries(0, false); // Now send a TimeoutNow message to the matching follower to immediately start an election. ActorSelection followerActor = context.getPeerActorSelection(followerId); followerActor.tell(TimeoutNow.INSTANCE, context.getActor()); log.debug("{}: Leader transfer complete", logName()); leadershipTransferContext.transferCohort.transferComplete(); leadershipTransferContext = null; } } @Override public void close() { if (leadershipTransferContext != null) { LeadershipTransferContext localLeadershipTransferContext = leadershipTransferContext; leadershipTransferContext = null; localLeadershipTransferContext.transferCohort.abortTransfer(); } super.close(); } @VisibleForTesting void markFollowerActive(final String followerId) { getFollower(followerId).markFollowerActive(); } @VisibleForTesting void markFollowerInActive(final String followerId) { getFollower(followerId).markFollowerInActive(); } private static class LeadershipTransferContext { RaftActorLeadershipTransferCohort transferCohort; Stopwatch timer = Stopwatch.createStarted(); LeadershipTransferContext(final RaftActorLeadershipTransferCohort transferCohort) { this.transferCohort = transferCohort; } boolean isExpired(final long timeout) { if (timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) { transferCohort.abortTransfer(); return true; } return false; } } }