BUG 4213 : Candidate should switch to Follower when it receives AppendEntries from... 07/26107/1
authorMoiz Raja <moraja@cisco.com>
Thu, 27 Aug 2015 18:07:08 +0000 (11:07 -0700)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 27 Aug 2015 19:08:38 +0000 (19:08 +0000)
Multiple peers might become candidates in a single election term. If one peer happened
to become a Leader it will send AppendEntries to all it's peers. When a Candidate receives
an AppendEntries and finds its term to be the same as the AppendEntries term then it should
switch to Follower.

Change-Id: Ia4ce41d4f3eefed50297b90107ad7429bb950ad8
Signed-off-by: Moiz Raja <moraja@cisco.com>
(cherry picked from commit 6b66fc7266ea909427b612da16a587936890b77b)

opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java

index 74bede171f1f6e6ad6b33feef6806d3c77321581..196456440b468794b21b60380934d705bfbf3234 100644 (file)
@@ -75,6 +75,14 @@ public class Candidate extends AbstractRaftActorBehavior {
             LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
         }
 
+        // Some other candidate for the same term became a leader and sent us an append entry
+        if(currentTerm() == appendEntries.getTerm()){
+            LOG.debug("{}: New Leader sent an append entry to Candidate for term {} will switch to Follower",
+                    logName(), currentTerm());
+
+            return switchBehavior(new Follower(context));
+        }
+
         return this;
     }
 
index 69d86a4c41990bb36a71401bd4cc99564f6af15d..496f7b071a927d9475c58cbd5b042a885f085b9e 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.TestActorRef;
@@ -23,7 +24,6 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.AbstractActorTest;
-import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -35,6 +35,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.slf4j.LoggerFactory;
@@ -135,14 +136,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
 
         behavior = createBehavior(context);
 
-        if (behavior instanceof Candidate) {
-            // Resetting the Candidates term to make sure it will match
-            // the term sent by AppendEntries. If this was not done then
-            // the test will fail because the Candidate will assume that
-            // the message was sent to it from a lower term peer and will
-            // thus respond with a failure
-            context.getTermInformation().update(2, "test");
-        }
+        assertFalse("This test should be overridden when testing Candidate", behavior instanceof Candidate);
 
         // Send an unknown message so that the state of the RaftActor remains unchanged
         RaftActorBehavior expected = behavior.handleMessage(behaviorActor, "unknown");
index 9b5a5abb9c06f7a77800171ba146e61b42bfc1d6..c67395b9ded202b8a179241d9f46e3b804703cb3 100644 (file)
@@ -14,8 +14,10 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Stopwatch;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
@@ -134,15 +136,42 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         candidate = new Candidate(createActorContext());
 
         setupPeers(1);
-        candidate.handleMessage(peerActors[0], new AppendEntries(1, "test", 0, 0,
-                Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)0));
+        RaftActorBehavior newBehavior = candidate.handleMessage(peerActors[0], new AppendEntries(1, "test", 0, 0,
+                Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short) 0));
 
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(
                 peerActors[0], AppendEntriesReply.class);
         assertEquals("isSuccess", false, reply.isSuccess());
         assertEquals("getTerm", 2, reply.getTerm());
+        assertTrue("New Behavior : " + newBehavior, newBehavior instanceof Candidate);
     }
 
+    @Test
+    public void testResponseToHandleAppendEntriesWithHigherTerm() {
+        candidate = new Candidate(createActorContext());
+
+        setupPeers(1);
+        RaftActorBehavior newBehavior = candidate.handleMessage(peerActors[0], new AppendEntries(5, "test", 0, 0,
+                Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short) 0));
+
+        assertTrue("New Behavior : " + newBehavior, newBehavior instanceof Follower);
+    }
+
+    @Test
+    public void testResponseToHandleAppendEntriesWithEqualTerm() {
+        MockRaftActorContext actorContext = createActorContext();
+
+        candidate = new Candidate(actorContext);
+
+        setupPeers(1);
+        RaftActorBehavior newBehavior = candidate.handleMessage(peerActors[0], new AppendEntries(2, "test", 0, 0,
+                Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short) 0));
+
+        assertTrue("New Behavior : " + newBehavior + " term = " + actorContext.getTermInformation().getCurrentTerm(),
+                newBehavior instanceof Follower);
+    }
+
+
     @Test
     public void testResponseToRequestVoteWithLowerTerm() {
         candidate = new Candidate(createActorContext());
@@ -210,6 +239,42 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
     }
 
+    @Test
+    @Override
+    public void testHandleAppendEntriesAddSameEntryToLog() throws Exception {
+        MockRaftActorContext context = createActorContext();
+
+        context.getTermInformation().update(2, "test");
+
+        // Prepare the receivers log
+        MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("zero");
+        setLastLogEntry(context, 2, 0, payload);
+
+        List<ReplicatedLogEntry> entries = new ArrayList<>();
+        entries.add(new MockRaftActorContext.MockReplicatedLogEntry(2, 0, payload));
+
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 2, -1, (short)0);
+
+        behavior = createBehavior(context);
+
+        // Resetting the Candidates term to make sure it will match
+        // the term sent by AppendEntries. If this was not done then
+        // the test will fail because the Candidate will assume that
+        // the message was sent to it from a lower term peer and will
+        // thus respond with a failure
+        context.getTermInformation().update(2, "test");
+
+        // Send an unknown message so that the state of the RaftActor remains unchanged
+        RaftActorBehavior expected = behavior.handleMessage(candidateActor, "unknown");
+
+        RaftActorBehavior raftBehavior = behavior.handleMessage(candidateActor, appendEntries);
+
+        assertEquals("Raft state", RaftState.Follower, raftBehavior.state());
+
+        assertEquals("ReplicatedLog size", 1, context.getReplicatedLog().size());
+
+        handleAppendEntriesAddSameEntryToLogReply(candidateActor);
+    }
 
     @Override
     protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {