From: Tom Pantelis Date: Wed, 16 Dec 2015 08:57:15 +0000 (-0500) Subject: Add RaftActorLeadershipTransferCohort and implement transfer in Leader X-Git-Tag: release/beryllium~64 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=db8fed63e18fccd2721fa7e189b2278a4f240f2c;hp=156a5dd91d9cb249d36c2b88c71b32b83d9332bf;p=controller.git Add RaftActorLeadershipTransferCohort and implement transfer in Leader Added a transferLeadership method to Leader that takes a RaftActorLeadershipTransferCohort to be notified when transfer is complete. The leader looks for a follower via AppendEntriesReply whose cached matchIndex matches the leader's last index. If one is found, it sends an additional AppendEntries to ensure the follower has applied all its log entries to its state. It then sends an ElectionTimeout to immediately start an electioni and notifies the RaftActorLeadershipTransferCohort via transferComplete. If no matching follower is found initially, the leader tries again at the next heartbeat interval via AppendEntriesReply. This continues until either a matching follower is found or the election timeout period elapses. The latter is checked at each heartbeat interval via beforeSendHeartbeat. On time out, it notifies the RaftActorLeadershipTransferCohort via abortTransfer. Change-Id: I841e13fdde27ee57b9789a4df6f69bf9901c1e79 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java new file mode 100644 index 0000000000..77678e9f24 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import org.opendaylight.controller.cluster.raft.behaviors.Leader; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; + +/** + * A helper class that participates in raft actor leadership transfer. An instance is created upon + * initialization of leadership transfer. + *

+ * NOTE: All methods on this class must be called on the actor's thread dispatcher as they modify internal state. + * + * @author Thomas Pantelis + */ +public abstract class RaftActorLeadershipTransferCohort { + private final RaftActor raftActor; + + protected RaftActorLeadershipTransferCohort(RaftActor raftActor) { + this.raftActor = raftActor; + } + + /** + * This method is invoked to start leadership transfer. + */ + public void startTransfer() { + RaftActorBehavior behavior = raftActor.getCurrentBehavior(); + if(behavior instanceof Leader) { + ((Leader)behavior).transferLeadership(this); + } + } + + /** + * This method is invoked to abort leadership transfer. + */ + public void abortTransfer() { + transferComplete(); + } + + /** + * This method is invoked when leadership transfer is complete. + */ + public abstract void transferComplete(); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 497d98cdce..f1a178d0ca 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -509,7 +509,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) { + protected void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) { // Send an AppendEntries to all followers for (Entry e : followerToLog.entrySet()) { final String followerId = e.getKey(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 6d3e364467..6bbb70ce6b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -8,13 +8,20 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; /** * The behavior of a RaftActor when it is in the Leader state @@ -41,6 +48,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderChec public class Leader extends AbstractLeader { private static final IsolatedLeaderCheck ISOLATED_LEADER_CHECK = new IsolatedLeaderCheck(); private final Stopwatch isolatedLeaderCheck; + private @Nullable LeadershipTransferContext leadershipTransferContext; public Leader(RaftActorContext context) { super(context); @@ -69,10 +77,73 @@ public class Leader extends AbstractLeader { isolatedLeaderCheck.reset().start(); } + if(leadershipTransferContext != null && leadershipTransferContext.isExpired( + context.getConfigParams().getElectionTimeOutInterval().toMillis())) { + LOG.debug("{}: Leadership transfer expired", logName()); + leadershipTransferContext = null; + } + } + + @Override + protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { + RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply); + tryToCompleteLeadershipTransfer(appendEntriesReply.getFollowerId()); + return returnBehavior; + } + + public void transferLeadership(@Nonnull RaftActorLeadershipTransferCohort leadershipTransferCohort) { + if(!context.hasFollowers()) { + leadershipTransferCohort.transferComplete(); + return; + } + + LOG.debug("{}: Attempting to transfer leadership", logName()); + + leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort); + + // Send an immediate heart beat to the followers. + sendAppendEntries(0, false); + } + + private void tryToCompleteLeadershipTransfer(String followerId) { + if(leadershipTransferContext == null) { + return; + } + + FollowerLogInformation followerInfo = getFollower(followerId); + if(followerInfo == null) { + return; + } + + long lastIndex = context.getReplicatedLog().lastIndex(); + + LOG.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}", + logName(), followerId, followerInfo.getMatchIndex(), lastIndex); + + if(followerInfo.getMatchIndex() == lastIndex) { + LOG.debug("{}: Follower's log matches - sending ElectionTimeout", logName()); + + // We can't be sure if the follower has applied all its log entries to its state so send an + // additional AppendEntries. + sendAppendEntries(0, false); + + // Now send an ElectionTimeout to the matching follower to immediately start an election. + ActorSelection followerActor = context.getPeerActorSelection(followerId); + followerActor.tell(new ElectionTimeout(), context.getActor()); + + LOG.debug("{}: Leader transfer complete", logName()); + + leadershipTransferContext.transferCohort.transferComplete(); + leadershipTransferContext = null; + } } @Override public void close() throws Exception { + if(leadershipTransferContext != null) { + leadershipTransferContext.transferCohort.abortTransfer(); + } + super.close(); } @@ -85,4 +156,22 @@ public class Leader extends AbstractLeader { void markFollowerInActive(String followerId) { getFollower(followerId).markFollowerInActive(); } + + private static class LeadershipTransferContext { + RaftActorLeadershipTransferCohort transferCohort; + Stopwatch timer = Stopwatch.createStarted(); + + LeadershipTransferContext(RaftActorLeadershipTransferCohort transferCohort) { + this.transferCohort = transferCohort; + } + + boolean isExpired(long timeout) { + if(timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) { + transferCohort.abortTransfer(); + return true; + } + + return false; + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 6664600073..c39c62c727 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -11,6 +11,9 @@ package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; @@ -32,6 +35,7 @@ import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; @@ -41,6 +45,7 @@ import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; @@ -1943,6 +1948,166 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class); } + @Test + public void testTransferLeadershipWithFollowerInSync() { + logStart("testTransferLeadershipWithFollowerInSync"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + leader = new Leader(leaderActorContext); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + sendReplicate(leaderActorContext, 0); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); + MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class); + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); + + // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date + MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + + // Leader should force an election timeout + MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + + verify(mockTransferCohort).transferComplete(); + } + + @Test + public void testTransferLeadershipWithEmptyLog() { + logStart("testTransferLeadershipWithEmptyLog"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + leader = new Leader(leaderActorContext); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + + // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // Leader should force an election timeout + MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + + verify(mockTransferCohort).transferComplete(); + } + + @Test + public void testTransferLeadershipWithFollowerInitiallyOutOfSync() { + logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(200, TimeUnit.MILLISECONDS)); + + leader = new Leader(leaderActorContext); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + + // Sync up the follower. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams(). + getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0)); + + // Leader should force an election timeout + MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + + verify(mockTransferCohort).transferComplete(); + } + + @Test + public void testTransferLeadershipWithFollowerSyncTimeout() { + logStart("testTransferLeadershipWithFollowerSyncTimeout"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(200, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + leader = new Leader(leaderActorContext); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + sendReplicate(leaderActorContext, 0); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + + // Send heartbeats to time out the transfer. + for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) { + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams(). + getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); + } + + verify(mockTransferCohort).abortTransfer(); + verify(mockTransferCohort, never()).transferComplete(); + MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100); + } + + @Test + public void testTransferLeadershipWithNoFollowers() { + logStart("testTransferLeadershipWithNoFollowers"); + + MockRaftActorContext leaderActorContext = createActorContext(); + + leader = new Leader(leaderActorContext); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort).transferComplete(); + } + @Override protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception {