import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
-import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import scala.concurrent.duration.FiniteDuration;
*
* @param sender The actor that sent this message
* @param appendEntries The AppendEntries message
- * @param suggestedState The state that the RaftActor should be in based
- * on the base class's processing of the AppendEntries
- * message
* @return
*/
protected abstract RaftState handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries, RaftState suggestedState);
+ AppendEntries appendEntries);
/**
*
* @param sender
* @param appendEntries
- * @param raftState
* @return
*/
protected RaftState appendEntries(ActorRef sender,
- AppendEntries appendEntries, RaftState raftState) {
-
- if (raftState != state()) {
- context.getLogger().debug("Suggested state is " + raftState
- + " current behavior state is " + state());
- }
+ AppendEntries appendEntries) {
// 1. Reply false if term < currentTerm (§5.1)
if (appendEntries.getTerm() < currentTerm()) {
}
- return handleAppendEntries(sender, appendEntries, raftState);
+ return handleAppendEntries(sender, appendEntries);
}
/**
*
* @param sender The actor that sent this message
* @param appendEntriesReply The AppendEntriesReply message
- * @param suggestedState The state that the RaftActor should be in based
- * on the base class's processing of the
- * AppendEntriesReply message
* @return
*/
protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply, RaftState suggestedState);
+ AppendEntriesReply appendEntriesReply);
/**
* requestVote handles the RequestVote message. This logic is common
*
* @param sender
* @param requestVote
- * @param suggestedState
* @return
*/
protected RaftState requestVote(ActorRef sender,
- RequestVote requestVote, RaftState suggestedState) {
+ RequestVote requestVote) {
boolean grantVote = false;
sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor());
- return suggestedState;
+ return state();
}
/**
*
* @param sender The actor that sent this message
* @param requestVoteReply The RequestVoteReply message
- * @param suggestedState The state that the RaftActor should be in based
- * on the base class's processing of the RequestVote
- * message
* @return
*/
protected abstract RaftState handleRequestVoteReply(ActorRef sender,
- RequestVoteReply requestVoteReply, RaftState suggestedState);
+ RequestVoteReply requestVoteReply);
/**
* Creates a random election duration
@Override
public RaftState handleMessage(ActorRef sender, Object message) {
- RaftState raftState = state();
- if (message instanceof RaftRPC) {
- raftState = applyTerm((RaftRPC) message);
- }
if (message instanceof AppendEntries) {
- raftState = appendEntries(sender, (AppendEntries) message,
- raftState);
+ return appendEntries(sender, (AppendEntries) message);
} else if (message instanceof AppendEntriesReply) {
- raftState =
- handleAppendEntriesReply(sender, (AppendEntriesReply) message,
- raftState);
+ return handleAppendEntriesReply(sender, (AppendEntriesReply) message);
} else if (message instanceof RequestVote) {
- raftState =
- requestVote(sender, (RequestVote) message, raftState);
+ return requestVote(sender, (RequestVote) message);
} else if (message instanceof RequestVoteReply) {
- raftState =
- handleRequestVoteReply(sender, (RequestVoteReply) message,
- raftState);
+ return handleRequestVoteReply(sender, (RequestVoteReply) message);
}
- return raftState;
+ return state();
}
@Override public String getLeaderId() {
return leaderId;
}
-
- private RaftState applyTerm(RaftRPC rpc) {
- // If RPC request or response contains term T > currentTerm:
- // set currentTerm = T, convert to follower (§5.1)
- // This applies to all RPC messages and responses
- if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- context.getTermInformation().update(rpc.getTerm(), null);
- return RaftState.Follower;
- }
- return state();
- }
-
}
import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
}
@Override protected RaftState handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries, RaftState suggestedState) {
+ AppendEntries appendEntries) {
- context.getLogger().error("An unexpected AppendEntries received in state " + state());
-
- return suggestedState;
+ return state();
}
@Override protected RaftState handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
-
- // Some peer thinks I was a leader and sent me a reply
+ AppendEntriesReply appendEntriesReply) {
- return suggestedState;
+ return state();
}
@Override protected RaftState handleRequestVoteReply(ActorRef sender,
- RequestVoteReply requestVoteReply, RaftState suggestedState) {
- if (suggestedState == RaftState.Follower) {
- // If base class thinks I should be follower then I am
- return suggestedState;
- }
+ RequestVoteReply requestVoteReply) {
if (requestVoteReply.isVoteGranted()) {
voteCount++;
@Override
public RaftState handleMessage(ActorRef sender, Object message) {
+
+ if (message instanceof RaftRPC) {
+ RaftRPC rpc = (RaftRPC) message;
+ // If RPC request or response contains term T > currentTerm:
+ // set currentTerm = T, convert to follower (§5.1)
+ // This applies to all RPC messages and responses
+ if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ context.getTermInformation().update(rpc.getTerm(), null);
+ return RaftState.Follower;
+ }
+ }
+
if (message instanceof ElectionTimeout) {
if (votesRequired == 0) {
// If there are no peers then we should be a Leader
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
/**
}
@Override protected RaftState handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries, RaftState suggestedState) {
+ AppendEntries appendEntries) {
// If we got here then we do appear to be talking to the leader
leaderId = appendEntries.getLeaderId();
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
lastIndex(), lastTerm()), actor());
- return suggestedState;
+ return state();
}
@Override protected RaftState handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
- return suggestedState;
+ AppendEntriesReply appendEntriesReply) {
+ return state();
}
@Override protected RaftState handleRequestVoteReply(ActorRef sender,
- RequestVoteReply requestVoteReply, RaftState suggestedState) {
- return suggestedState;
+ RequestVoteReply requestVoteReply) {
+ return state();
}
@Override public RaftState state() {
}
@Override public RaftState handleMessage(ActorRef sender, Object message) {
+ if (message instanceof RaftRPC) {
+ RaftRPC rpc = (RaftRPC) message;
+ // If RPC request or response contains term T > currentTerm:
+ // set currentTerm = T, convert to follower (§5.1)
+ // This applies to all RPC messages and responses
+ if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ context.getTermInformation().update(rpc.getTerm(), null);
+ }
+ }
+
if (message instanceof ElectionTimeout) {
return RaftState.Candidate;
} else if (message instanceof InstallSnapshot) {
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import scala.concurrent.duration.FiniteDuration;
}
@Override protected RaftState handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries, RaftState suggestedState) {
+ AppendEntries appendEntries) {
- context.getLogger()
- .error("An unexpected AppendEntries received in state " + state());
-
- return suggestedState;
+ return state();
}
@Override protected RaftState handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
-
- // Do not take any other action since a behavior change is coming
- if (suggestedState != state())
- return suggestedState;
+ AppendEntriesReply appendEntriesReply) {
// Update the FollowerLogInformation
String followerId = appendEntriesReply.getFollowerId();
applyLogToStateMachine(context.getCommitIndex());
}
- return suggestedState;
+ return state();
}
protected ClientRequestTracker findClientRequestTracker(long logIndex) {
}
@Override protected RaftState handleRequestVoteReply(ActorRef sender,
- RequestVoteReply requestVoteReply, RaftState suggestedState) {
- return suggestedState;
+ RequestVoteReply requestVoteReply) {
+ return state();
}
@Override public RaftState state() {
@Override public RaftState handleMessage(ActorRef sender, Object message) {
Preconditions.checkNotNull(sender, "sender should not be null");
+ if (message instanceof RaftRPC) {
+ RaftRPC rpc = (RaftRPC) message;
+ // If RPC request or response contains term T > currentTerm:
+ // set currentTerm = T, convert to follower (§5.1)
+ // This applies to all RPC messages and responses
+ if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ context.getTermInformation().update(rpc.getTerm(), null);
+ return RaftState.Follower;
+ }
+ }
+
try {
if (message instanceof SendHeartBeat) {
return sendHeartBeat();
new Within(duration("1 seconds")) {
protected void run() {
- RaftActorBehavior follower = createBehavior(
+ RaftActorBehavior behavior = createBehavior(
createActorContext(behaviorActor));
- follower.handleMessage(getTestActor(),
+ RaftState raftState = behavior.handleMessage(getTestActor(),
new RequestVote(1000, "test", 10000, 999));
- final Boolean out =
- new ExpectMsg<Boolean>(duration("1 seconds"),
- "RequestVoteReply") {
- // do not put code outside this method, will run afterwards
- protected Boolean match(Object in) {
- if (in instanceof RequestVoteReply) {
- RequestVoteReply reply =
- (RequestVoteReply) in;
- return reply.isVoteGranted();
- } else {
- throw noMatch();
+ if(behavior.state() != RaftState.Follower){
+ assertEquals(RaftState.Follower, raftState);
+ } else {
+
+ final Boolean out =
+ new ExpectMsg<Boolean>(duration("1 seconds"),
+ "RequestVoteReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply =
+ (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
}
- }
- }.get();
+ }.get();
- assertEquals(true, out);
+ assertEquals(true, out);
+ }
}
};
}};
((MockRaftActorContext) actorContext).setReplicatedLog(log);
- RaftActorBehavior follower = createBehavior(actorContext);
+ RaftActorBehavior behavior = createBehavior(actorContext);
- follower.handleMessage(getTestActor(),
+ RaftState raftState = behavior.handleMessage(getTestActor(),
new RequestVote(1000, "test", 10000, 999));
- final Boolean out =
- new ExpectMsg<Boolean>(duration("1 seconds"),
- "RequestVoteReply") {
- // do not put code outside this method, will run afterwards
- protected Boolean match(Object in) {
- if (in instanceof RequestVoteReply) {
- RequestVoteReply reply =
- (RequestVoteReply) in;
- return reply.isVoteGranted();
- } else {
- throw noMatch();
+ if(behavior.state() != RaftState.Follower){
+ assertEquals(RaftState.Follower, raftState);
+ } else {
+ final Boolean out =
+ new ExpectMsg<Boolean>(duration("1 seconds"),
+ "RequestVoteReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply =
+ (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
}
- }
- }.get();
+ }.get();
- assertEquals(false, out);
+ assertEquals(false, out);
+ }
}
};
}};
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
context.setLastApplied(100);
setLastLogEntry((MockRaftActorContext) context, 0, 0, "");
+ List<ReplicatedLogEntry> entries =
+ Arrays.asList(
+ (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(100, 101,
+ "foo")
+ );
+
// The new commitIndex is 101
AppendEntries appendEntries =
- new AppendEntries(100, "leader-1", 0, 0, null, 101);
+ new AppendEntries(100, "leader-1", 0, 0, entries, 101);
RaftState raftState =
createBehavior(context).handleMessage(getRef(), appendEntries);