import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import scala.concurrent.duration.FiniteDuration;
}
@Override protected RaftState handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries, RaftState suggestedState) {
+ AppendEntries appendEntries) {
- context.getLogger()
- .error("An unexpected AppendEntries received in state " + state());
-
- return suggestedState;
+ return state();
}
@Override protected RaftState handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
-
- // Do not take any other action since a behavior change is coming
- if (suggestedState != state())
- return suggestedState;
+ AppendEntriesReply appendEntriesReply) {
// Update the FollowerLogInformation
String followerId = appendEntriesReply.getFollowerId();
followerLogInformation
.setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
} else {
+
+ // TODO: When we find that the follower is out of sync with the
+ // Leader we simply decrement that followers next index by 1.
+ // Would it be possible to do better than this? The RAFT spec
+ // does not explicitly deal with it but may be something for us to
+ // think about
+
followerLogInformation.decrNextIndex();
}
applyLogToStateMachine(context.getCommitIndex());
}
- return suggestedState;
+ return state();
}
protected ClientRequestTracker findClientRequestTracker(long logIndex) {
}
@Override protected RaftState handleRequestVoteReply(ActorRef sender,
- RequestVoteReply requestVoteReply, RaftState suggestedState) {
- return suggestedState;
+ RequestVoteReply requestVoteReply) {
+ return state();
}
@Override public RaftState state() {
return RaftState.Leader;
}
- @Override public RaftState handleMessage(ActorRef sender, Object message) {
+ @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
Preconditions.checkNotNull(sender, "sender should not be null");
+ Object message = fromSerializableMessage(originalMessage);
+
+ if (message instanceof RaftRPC) {
+ RaftRPC rpc = (RaftRPC) message;
+ // If RPC request or response contains term T > currentTerm:
+ // set currentTerm = T, convert to follower (ยง5.1)
+ // This applies to all RPC messages and responses
+ if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+ return RaftState.Follower;
+ }
+ }
+
try {
if (message instanceof SendHeartBeat) {
return sendHeartBeat();
} else if (message instanceof Replicate) {
replicate((Replicate) message);
} else if (message instanceof InstallSnapshotReply){
- // FIXME : Should I be checking the term here too?
handleInstallSnapshotReply(
(InstallSnapshotReply) message);
}
List<ReplicatedLogEntry> entries = Collections.emptyList();
if(context.getReplicatedLog().isPresent(nextIndex)){
+ // TODO: Instead of sending all entries from nextIndex
+ // only send a fixed number of entries to each follower
+ // This is to avoid the situation where there are a lot of
+ // entries to install for a fresh follower or to a follower
+ // that has fallen too far behind with the log but yet is not
+ // eligible to receive a snapshot
entries =
context.getReplicatedLog().getFrom(nextIndex);
}
followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(nextIndex), prevLogTerm(nextIndex),
- entries, context.getCommitIndex()
- ),
- actor()
- );
+ new AppendEntries(currentTerm(), context.getId(), prevLogIndex(nextIndex),
+ prevLogTerm(nextIndex), entries, context.getCommitIndex()).toSerializable(),
+ actor());
}
}
+ /**
+ * An installSnapshot is scheduled at a interval that is a multiple of
+ * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
+ * snapshots at every heartbeat.
+ */
private void installSnapshotIfNeeded(){
for (String followerId : followerToActor.keySet()) {
ActorSelection followerActor =