Add RaftActorLeadershipTransferCohort and implement transfer in Leader 08/31508/5
authorTom Pantelis <tpanteli@brocade.com>
Wed, 16 Dec 2015 08:57:15 +0000 (03:57 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 21 Dec 2015 11:32:01 +0000 (06:32 -0500)
Added a transferLeadership method to Leader that takes a
RaftActorLeadershipTransferCohort to be notified when transfer is
complete.

The leader looks for a follower via AppendEntriesReply whose cached
matchIndex matches the leader's last index. If one is found, it sends
an additional AppendEntries to ensure the follower has applied all its
log entries to its state. It then sends an ElectionTimeout to immediately
start an electioni and notifies the RaftActorLeadershipTransferCohort
via transferComplete.

If no matching follower is found initially, the leader tries again at
the next heartbeat interval via AppendEntriesReply. This continues until
either a matching follower is found or the election timeout period
elapses. The latter is checked at each heartbeat interval via
beforeSendHeartbeat. On time out, it notifies the
RaftActorLeadershipTransferCohort via abortTransfer.

Change-Id: I841e13fdde27ee57b9789a4df6f69bf9901c1e79
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java
new file mode 100644 (file)
index 0000000..77678e9
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+
+/**
+ * A helper class that participates in raft actor leadership transfer. An instance is created upon
+ * initialization of leadership transfer.
+ * <p>
+ * NOTE: All methods on this class must be called on the actor's thread dispatcher as they modify internal state.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class RaftActorLeadershipTransferCohort {
+    private final RaftActor raftActor;
+
+    protected RaftActorLeadershipTransferCohort(RaftActor raftActor) {
+        this.raftActor = raftActor;
+    }
+
+    /**
+     * This method is invoked to start leadership transfer.
+     */
+    public void startTransfer() {
+        RaftActorBehavior behavior = raftActor.getCurrentBehavior();
+        if(behavior instanceof Leader) {
+            ((Leader)behavior).transferLeadership(this);
+        }
+    }
+
+    /**
+     * This method is invoked to abort leadership transfer.
+     */
+    public void abortTransfer() {
+        transferComplete();
+    }
+
+    /**
+     * This method is invoked when leadership transfer is complete.
+     */
+    public abstract void transferComplete();
+}
index 497d98cdce13cded64be94696591ffdb171f22d3..f1a178d0cac9fb51737b449edf7cbc3593cd745f 100644 (file)
@@ -509,7 +509,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         }
     }
 
-    private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
+    protected void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
         // Send an AppendEntries to all followers
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             final String followerId = e.getKey();
index 6d3e3644674d88c4d1709781089949a66392d52d..6bbb70ce6b71ebb2ed7077b4962235fbf13f5cdf 100644 (file)
@@ -8,13 +8,20 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 
 /**
  * The behavior of a RaftActor when it is in the Leader state
@@ -41,6 +48,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderChec
 public class Leader extends AbstractLeader {
     private static final IsolatedLeaderCheck ISOLATED_LEADER_CHECK = new IsolatedLeaderCheck();
     private final Stopwatch isolatedLeaderCheck;
+    private @Nullable LeadershipTransferContext leadershipTransferContext;
 
     public Leader(RaftActorContext context) {
         super(context);
@@ -69,10 +77,73 @@ public class Leader extends AbstractLeader {
             isolatedLeaderCheck.reset().start();
         }
 
+        if(leadershipTransferContext != null && leadershipTransferContext.isExpired(
+                context.getConfigParams().getElectionTimeOutInterval().toMillis())) {
+            LOG.debug("{}: Leadership transfer expired", logName());
+            leadershipTransferContext = null;
+        }
+    }
+
+    @Override
+    protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
+        RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply);
+        tryToCompleteLeadershipTransfer(appendEntriesReply.getFollowerId());
+        return returnBehavior;
+    }
+
+    public void transferLeadership(@Nonnull RaftActorLeadershipTransferCohort leadershipTransferCohort) {
+        if(!context.hasFollowers()) {
+            leadershipTransferCohort.transferComplete();
+            return;
+        }
+
+        LOG.debug("{}: Attempting to transfer leadership", logName());
+
+        leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort);
+
+        // Send an immediate heart beat to the followers.
+        sendAppendEntries(0, false);
+    }
+
+    private void tryToCompleteLeadershipTransfer(String followerId) {
+        if(leadershipTransferContext == null) {
+            return;
+        }
+
+        FollowerLogInformation followerInfo = getFollower(followerId);
+        if(followerInfo == null) {
+            return;
+        }
+
+        long lastIndex = context.getReplicatedLog().lastIndex();
+
+        LOG.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}",
+                logName(), followerId, followerInfo.getMatchIndex(), lastIndex);
+
+        if(followerInfo.getMatchIndex() == lastIndex) {
+            LOG.debug("{}: Follower's log matches - sending ElectionTimeout", logName());
+
+            // We can't be sure if the follower has applied all its log entries to its state so send an
+            // additional AppendEntries.
+            sendAppendEntries(0, false);
+
+            // Now send an ElectionTimeout to the matching follower to immediately start an election.
+            ActorSelection followerActor = context.getPeerActorSelection(followerId);
+            followerActor.tell(new ElectionTimeout(), context.getActor());
+
+            LOG.debug("{}: Leader transfer complete", logName());
+
+            leadershipTransferContext.transferCohort.transferComplete();
+            leadershipTransferContext = null;
+        }
     }
 
     @Override
     public void close() throws Exception {
+        if(leadershipTransferContext != null) {
+            leadershipTransferContext.transferCohort.abortTransfer();
+        }
+
         super.close();
     }
 
@@ -85,4 +156,22 @@ public class Leader extends AbstractLeader {
     void markFollowerInActive(String followerId) {
         getFollower(followerId).markFollowerInActive();
     }
+
+    private static class LeadershipTransferContext {
+        RaftActorLeadershipTransferCohort transferCohort;
+        Stopwatch timer = Stopwatch.createStarted();
+
+        LeadershipTransferContext(RaftActorLeadershipTransferCohort transferCohort) {
+            this.transferCohort = transferCohort;
+        }
+
+        boolean isExpired(long timeout) {
+            if(timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) {
+                transferCohort.abortTransfer();
+                return true;
+            }
+
+            return false;
+        }
+    }
 }
index 6664600073f2c727936ece4da93f6cd5b76bb290..c39c62c727b601b0dd20c9e59098345604490798 100644 (file)
@@ -11,6 +11,9 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -32,6 +35,7 @@ import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
@@ -41,6 +45,7 @@ import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
@@ -1943,6 +1948,166 @@ public class LeaderTest extends AbstractLeaderTest {
         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
     }
 
+    @Test
+    public void testTransferLeadershipWithFollowerInSync() {
+        logStart("testTransferLeadershipWithFollowerInSync");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        leader = new Leader(leaderActorContext);
+
+        // Initial heartbeat
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        sendReplicate(leaderActorContext, 0);
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+        MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+        MessageCollectorActor.clearMessages(followerActor);
+
+        RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        leader.transferLeadership(mockTransferCohort);
+
+        verify(mockTransferCohort, never()).transferComplete();
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+        // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
+        MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+
+        // Leader should force an election timeout
+        MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+        verify(mockTransferCohort).transferComplete();
+    }
+
+    @Test
+    public void testTransferLeadershipWithEmptyLog() {
+        logStart("testTransferLeadershipWithEmptyLog");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        leader = new Leader(leaderActorContext);
+
+        // Initial heartbeat
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        leader.transferLeadership(mockTransferCohort);
+
+        verify(mockTransferCohort, never()).transferComplete();
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+
+        // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // Leader should force an election timeout
+        MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+        verify(mockTransferCohort).transferComplete();
+    }
+
+    @Test
+    public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
+        logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(200, TimeUnit.MILLISECONDS));
+
+        leader = new Leader(leaderActorContext);
+
+        // Initial heartbeat
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        MessageCollectorActor.clearMessages(followerActor);
+
+        RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        leader.transferLeadership(mockTransferCohort);
+
+        verify(mockTransferCohort, never()).transferComplete();
+
+        // Sync up the follower.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+                getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+        leader.handleMessage(leaderActor, new SendHeartBeat());
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
+
+        // Leader should force an election timeout
+        MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+        verify(mockTransferCohort).transferComplete();
+    }
+
+    @Test
+    public void testTransferLeadershipWithFollowerSyncTimeout() {
+        logStart("testTransferLeadershipWithFollowerSyncTimeout");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(200, TimeUnit.MILLISECONDS));
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        leader = new Leader(leaderActorContext);
+
+        // Initial heartbeat
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        sendReplicate(leaderActorContext, 0);
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+
+        RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        leader.transferLeadership(mockTransferCohort);
+
+        verify(mockTransferCohort, never()).transferComplete();
+
+        // Send heartbeats to time out the transfer.
+        for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+                    getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+        }
+
+        verify(mockTransferCohort).abortTransfer();
+        verify(mockTransferCohort, never()).transferComplete();
+        MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
+    }
+
+    @Test
+    public void testTransferLeadershipWithNoFollowers() {
+        logStart("testTransferLeadershipWithNoFollowers");
+
+        MockRaftActorContext leaderActorContext = createActorContext();
+
+        leader = new Leader(leaderActorContext);
+
+        RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        leader.transferLeadership(mockTransferCohort);
+
+        verify(mockTransferCohort).transferComplete();
+    }
+
     @Override
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
             ActorRef actorRef, RaftRPC rpc) throws Exception {