From 00570ac6512cdeea8d3c1da85e4a2e118a6f925c Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Sat, 19 Jul 2014 19:49:42 -0700 Subject: [PATCH] Remove the confusion around suggestedState The code has been refactored so that it is clear how exactly a state is applied. Earlier the state was being passed around and the decision of which state was returned was not clear. Tests were modified as appropriate. Change-Id: Ifb3d339affcc206d1358119defd4b7400e247f8b Signed-off-by: Moiz Raja --- .../behaviors/AbstractRaftActorBehavior.java | 63 +++------------- .../cluster/raft/behaviors/Candidate.java | 31 ++++---- .../cluster/raft/behaviors/Follower.java | 23 ++++-- .../cluster/raft/behaviors/Leader.java | 31 ++++---- .../AbstractRaftActorBehaviorTest.java | 73 +++++++++++-------- .../cluster/raft/behaviors/FollowerTest.java | 9 ++- 6 files changed, 114 insertions(+), 116 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index d12789205e..c64855ff7d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -18,7 +18,6 @@ import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; -import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import scala.concurrent.duration.FiniteDuration; @@ -89,13 +88,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender The actor that sent this message * @param appendEntries The AppendEntries message - * @param suggestedState The state that the RaftActor should be in based - * on the base class's processing of the AppendEntries - * message * @return */ protected abstract RaftState handleAppendEntries(ActorRef sender, - AppendEntries appendEntries, RaftState suggestedState); + AppendEntries appendEntries); /** @@ -104,16 +100,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender * @param appendEntries - * @param raftState * @return */ protected RaftState appendEntries(ActorRef sender, - AppendEntries appendEntries, RaftState raftState) { - - if (raftState != state()) { - context.getLogger().debug("Suggested state is " + raftState - + " current behavior state is " + state()); - } + AppendEntries appendEntries) { // 1. Reply false if term < currentTerm (§5.1) if (appendEntries.getTerm() < currentTerm()) { @@ -128,7 +118,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } - return handleAppendEntries(sender, appendEntries, raftState); + return handleAppendEntries(sender, appendEntries); } /** @@ -141,13 +131,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender The actor that sent this message * @param appendEntriesReply The AppendEntriesReply message - * @param suggestedState The state that the RaftActor should be in based - * on the base class's processing of the - * AppendEntriesReply message * @return */ protected abstract RaftState handleAppendEntriesReply(ActorRef sender, - AppendEntriesReply appendEntriesReply, RaftState suggestedState); + AppendEntriesReply appendEntriesReply); /** * requestVote handles the RequestVote message. This logic is common @@ -155,11 +142,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender * @param requestVote - * @param suggestedState * @return */ protected RaftState requestVote(ActorRef sender, - RequestVote requestVote, RaftState suggestedState) { + RequestVote requestVote) { boolean grantVote = false; @@ -197,7 +183,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor()); - return suggestedState; + return state(); } /** @@ -210,13 +196,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender The actor that sent this message * @param requestVoteReply The RequestVoteReply message - * @param suggestedState The state that the RaftActor should be in based - * on the base class's processing of the RequestVote - * message * @return */ protected abstract RaftState handleRequestVoteReply(ActorRef sender, - RequestVoteReply requestVoteReply, RaftState suggestedState); + RequestVoteReply requestVoteReply); /** * Creates a random election duration @@ -368,41 +351,19 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { @Override public RaftState handleMessage(ActorRef sender, Object message) { - RaftState raftState = state(); - if (message instanceof RaftRPC) { - raftState = applyTerm((RaftRPC) message); - } if (message instanceof AppendEntries) { - raftState = appendEntries(sender, (AppendEntries) message, - raftState); + return appendEntries(sender, (AppendEntries) message); } else if (message instanceof AppendEntriesReply) { - raftState = - handleAppendEntriesReply(sender, (AppendEntriesReply) message, - raftState); + return handleAppendEntriesReply(sender, (AppendEntriesReply) message); } else if (message instanceof RequestVote) { - raftState = - requestVote(sender, (RequestVote) message, raftState); + return requestVote(sender, (RequestVote) message); } else if (message instanceof RequestVoteReply) { - raftState = - handleRequestVoteReply(sender, (RequestVoteReply) message, - raftState); + return handleRequestVoteReply(sender, (RequestVoteReply) message); } - return raftState; + return state(); } @Override public String getLeaderId() { return leaderId; } - - private RaftState applyTerm(RaftRPC rpc) { - // If RPC request or response contains term T > currentTerm: - // set currentTerm = T, convert to follower (§5.1) - // This applies to all RPC messages and responses - if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - context.getTermInformation().update(rpc.getTerm(), null); - return RaftState.Follower; - } - return state(); - } - } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java index 8d84590426..13f5f1d7a4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -15,6 +15,7 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; @@ -84,27 +85,19 @@ public class Candidate extends AbstractRaftActorBehavior { } @Override protected RaftState handleAppendEntries(ActorRef sender, - AppendEntries appendEntries, RaftState suggestedState) { + AppendEntries appendEntries) { - context.getLogger().error("An unexpected AppendEntries received in state " + state()); - - return suggestedState; + return state(); } @Override protected RaftState handleAppendEntriesReply(ActorRef sender, - AppendEntriesReply appendEntriesReply, RaftState suggestedState) { - - // Some peer thinks I was a leader and sent me a reply + AppendEntriesReply appendEntriesReply) { - return suggestedState; + return state(); } @Override protected RaftState handleRequestVoteReply(ActorRef sender, - RequestVoteReply requestVoteReply, RaftState suggestedState) { - if (suggestedState == RaftState.Follower) { - // If base class thinks I should be follower then I am - return suggestedState; - } + RequestVoteReply requestVoteReply) { if (requestVoteReply.isVoteGranted()) { voteCount++; @@ -123,6 +116,18 @@ public class Candidate extends AbstractRaftActorBehavior { @Override public RaftState handleMessage(ActorRef sender, Object message) { + + if (message instanceof RaftRPC) { + RaftRPC rpc = (RaftRPC) message; + // If RPC request or response contains term T > currentTerm: + // set currentTerm = T, convert to follower (§5.1) + // This applies to all RPC messages and responses + if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { + context.getTermInformation().update(rpc.getTerm(), null); + return RaftState.Follower; + } + } + if (message instanceof ElectionTimeout) { if (votesRequired == 0) { // If there are no peers then we should be a Leader diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index dd2f19b137..823d563a74 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeou import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; /** @@ -37,7 +38,7 @@ public class Follower extends AbstractRaftActorBehavior { } @Override protected RaftState handleAppendEntries(ActorRef sender, - AppendEntries appendEntries, RaftState suggestedState) { + AppendEntries appendEntries) { // If we got here then we do appear to be talking to the leader leaderId = appendEntries.getLeaderId(); @@ -170,17 +171,17 @@ public class Follower extends AbstractRaftActorBehavior { sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true, lastIndex(), lastTerm()), actor()); - return suggestedState; + return state(); } @Override protected RaftState handleAppendEntriesReply(ActorRef sender, - AppendEntriesReply appendEntriesReply, RaftState suggestedState) { - return suggestedState; + AppendEntriesReply appendEntriesReply) { + return state(); } @Override protected RaftState handleRequestVoteReply(ActorRef sender, - RequestVoteReply requestVoteReply, RaftState suggestedState) { - return suggestedState; + RequestVoteReply requestVoteReply) { + return state(); } @Override public RaftState state() { @@ -188,6 +189,16 @@ public class Follower extends AbstractRaftActorBehavior { } @Override public RaftState handleMessage(ActorRef sender, Object message) { + if (message instanceof RaftRPC) { + RaftRPC rpc = (RaftRPC) message; + // If RPC request or response contains term T > currentTerm: + // set currentTerm = T, convert to follower (§5.1) + // This applies to all RPC messages and responses + if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { + context.getTermInformation().update(rpc.getTerm(), null); + } + } + if (message instanceof ElectionTimeout) { return RaftState.Candidate; } else if (message instanceof InstallSnapshot) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 857c87f0ac..a647e17a20 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -27,6 +27,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import scala.concurrent.duration.FiniteDuration; @@ -118,20 +119,13 @@ public class Leader extends AbstractRaftActorBehavior { } @Override protected RaftState handleAppendEntries(ActorRef sender, - AppendEntries appendEntries, RaftState suggestedState) { + AppendEntries appendEntries) { - context.getLogger() - .error("An unexpected AppendEntries received in state " + state()); - - return suggestedState; + return state(); } @Override protected RaftState handleAppendEntriesReply(ActorRef sender, - AppendEntriesReply appendEntriesReply, RaftState suggestedState) { - - // Do not take any other action since a behavior change is coming - if (suggestedState != state()) - return suggestedState; + AppendEntriesReply appendEntriesReply) { // Update the FollowerLogInformation String followerId = appendEntriesReply.getFollowerId(); @@ -177,7 +171,7 @@ public class Leader extends AbstractRaftActorBehavior { applyLogToStateMachine(context.getCommitIndex()); } - return suggestedState; + return state(); } protected ClientRequestTracker findClientRequestTracker(long logIndex) { @@ -191,8 +185,8 @@ public class Leader extends AbstractRaftActorBehavior { } @Override protected RaftState handleRequestVoteReply(ActorRef sender, - RequestVoteReply requestVoteReply, RaftState suggestedState) { - return suggestedState; + RequestVoteReply requestVoteReply) { + return state(); } @Override public RaftState state() { @@ -202,6 +196,17 @@ public class Leader extends AbstractRaftActorBehavior { @Override public RaftState handleMessage(ActorRef sender, Object message) { Preconditions.checkNotNull(sender, "sender should not be null"); + if (message instanceof RaftRPC) { + RaftRPC rpc = (RaftRPC) message; + // If RPC request or response contains term T > currentTerm: + // set currentTerm = T, convert to follower (§5.1) + // This applies to all RPC messages and responses + if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { + context.getTermInformation().update(rpc.getTerm(), null); + return RaftState.Follower; + } + } + try { if (message instanceof SendHeartBeat) { return sendHeartBeat(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java index a7b6825c7d..1a37b921e3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java @@ -169,28 +169,33 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { new Within(duration("1 seconds")) { protected void run() { - RaftActorBehavior follower = createBehavior( + RaftActorBehavior behavior = createBehavior( createActorContext(behaviorActor)); - follower.handleMessage(getTestActor(), + RaftState raftState = behavior.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999)); - final Boolean out = - new ExpectMsg(duration("1 seconds"), - "RequestVoteReply") { - // do not put code outside this method, will run afterwards - protected Boolean match(Object in) { - if (in instanceof RequestVoteReply) { - RequestVoteReply reply = - (RequestVoteReply) in; - return reply.isVoteGranted(); - } else { - throw noMatch(); + if(behavior.state() != RaftState.Follower){ + assertEquals(RaftState.Follower, raftState); + } else { + + final Boolean out = + new ExpectMsg(duration("1 seconds"), + "RequestVoteReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof RequestVoteReply) { + RequestVoteReply reply = + (RequestVoteReply) in; + return reply.isVoteGranted(); + } else { + throw noMatch(); + } } - } - }.get(); + }.get(); - assertEquals(true, out); + assertEquals(true, out); + } } }; }}; @@ -219,27 +224,31 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { ((MockRaftActorContext) actorContext).setReplicatedLog(log); - RaftActorBehavior follower = createBehavior(actorContext); + RaftActorBehavior behavior = createBehavior(actorContext); - follower.handleMessage(getTestActor(), + RaftState raftState = behavior.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999)); - final Boolean out = - new ExpectMsg(duration("1 seconds"), - "RequestVoteReply") { - // do not put code outside this method, will run afterwards - protected Boolean match(Object in) { - if (in instanceof RequestVoteReply) { - RequestVoteReply reply = - (RequestVoteReply) in; - return reply.isVoteGranted(); - } else { - throw noMatch(); + if(behavior.state() != RaftState.Follower){ + assertEquals(RaftState.Follower, raftState); + } else { + final Boolean out = + new ExpectMsg(duration("1 seconds"), + "RequestVoteReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof RequestVoteReply) { + RequestVoteReply reply = + (RequestVoteReply) in; + return reply.isVoteGranted(); + } else { + throw noMatch(); + } } - } - }.get(); + }.get(); - assertEquals(false, out); + assertEquals(false, out); + } } }; }}; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 677442402c..f6a6217d08 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static org.junit.Assert.assertEquals; @@ -158,9 +159,15 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { context.setLastApplied(100); setLastLogEntry((MockRaftActorContext) context, 0, 0, ""); + List entries = + Arrays.asList( + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(100, 101, + "foo") + ); + // The new commitIndex is 101 AppendEntries appendEntries = - new AppendEntries(100, "leader-1", 0, 0, null, 101); + new AppendEntries(100, "leader-1", 0, 0, entries, 101); RaftState raftState = createBehavior(context).handleMessage(getRef(), appendEntries); -- 2.36.6