import akka.actor.ActorRef;
import akka.actor.Cancellable;
+import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
/**
*
*/
-
private Cancellable electionCancel = null;
+ /**
+ *
+ */
+ protected String leaderId = null;
+
protected AbstractRaftActorBehavior(RaftActorContext context) {
this.context = context;
protected RaftState appendEntries(ActorRef sender,
- AppendEntries appendEntries, RaftState raftState){
+ AppendEntries appendEntries, RaftState raftState) {
- // 1. Reply false if term < currentTerm (§5.1)
- if(appendEntries.getTerm() < currentTerm()){
- sender.tell(new AppendEntriesReply(currentTerm(), false), actor());
- return state();
+ if (raftState != state()) {
+ context.getLogger().debug("Suggested state is " + raftState
+ + " current behavior state is " + state());
}
- // 2. Reply false if log doesn’t contain an entry at prevLogIndex
- // whose term matches prevLogTerm (§5.3)
- ReplicatedLogEntry previousEntry = context.getReplicatedLog()
- .get(appendEntries.getPrevLogIndex());
-
- if(previousEntry == null || previousEntry.getTerm() != appendEntries.getPrevLogTerm()){
- sender.tell(new AppendEntriesReply(currentTerm(), false), actor());
+ // 1. Reply false if term < currentTerm (§5.1)
+ if (appendEntries.getTerm() < currentTerm()) {
+ context.getLogger().debug(
+ "Cannot append entries because sender term " + appendEntries
+ .getTerm() + " is less than " + currentTerm());
+ sender.tell(
+ new AppendEntriesReply(context.getId(), currentTerm(), false,
+ lastIndex(), lastTerm()), actor()
+ );
return state();
}
- if(appendEntries.getEntries() != null) {
- // 3. If an existing entry conflicts with a new one (same index
- // but different terms), delete the existing entry and all that
- // follow it (§5.3)
- int addEntriesFrom = 0;
- for (int i = 0;
- i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
- ReplicatedLogEntry newEntry = context.getReplicatedLog()
- .get(i + 1);
-
- if (newEntry != null && newEntry.getTerm() == appendEntries.getEntries().get(i).getTerm()){
- break;
- }
- if (newEntry != null && newEntry.getTerm() != appendEntries
- .getEntries().get(i).getTerm()) {
- context.getReplicatedLog().removeFrom(i + 1);
- break;
- }
- }
-
- // 4. Append any new entries not already in the log
- for (int i = addEntriesFrom;
- i < appendEntries.getEntries().size(); i++) {
- context.getReplicatedLog()
- .append(appendEntries.getEntries().get(i));
- }
- }
-
-
- // 5. If leaderCommit > commitIndex, set commitIndex =
- // min(leaderCommit, index of last new entry)
- context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(),
- context.getReplicatedLog().last().getIndex()));
-
- // If commitIndex > lastApplied: increment lastApplied, apply
- // log[lastApplied] to state machine (§5.3)
- if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
- applyLogToStateMachine(appendEntries.getLeaderCommit());
- }
-
- sender.tell(new AppendEntriesReply(currentTerm(), true), actor());
return handleAppendEntries(sender, appendEntries, raftState);
}
if (requestVote.getLastLogTerm() > lastTerm()) {
candidateLatest = true;
} else if ((requestVote.getLastLogTerm() == lastTerm())
- && requestVote.getLastLogIndex() >= lastTerm()) {
+ && requestVote.getLastLogIndex() >= lastIndex()) {
candidateLatest = true;
}
protected abstract RaftState handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply, RaftState suggestedState);
- /**
- * @return The derived class should return the state that corresponds to
- * it's behavior
- */
- protected abstract RaftState state();
-
protected FiniteDuration electionDuration() {
long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
return new FiniteDuration(ELECTION_TIME_INTERVAL + variance,
TimeUnit.MILLISECONDS);
}
- protected void scheduleElection(FiniteDuration interval) {
-
+ protected void stopElection() {
if (electionCancel != null && !electionCancel.isCancelled()) {
electionCancel.cancel();
}
+ }
+
+ protected void scheduleElection(FiniteDuration interval) {
+
+ stopElection();
// Schedule an election. When the scheduler triggers an ElectionTimeout
// message is sent to itself
}
protected long lastTerm() {
- return context.getReplicatedLog().last().getTerm();
+ return context.getReplicatedLog().lastTerm();
}
protected long lastIndex() {
- return context.getReplicatedLog().last().getIndex();
+ return context.getReplicatedLog().lastIndex();
}
+ protected ClientRequestTracker findClientRequestTracker(long logIndex) {
+ return null;
+ }
+
+ protected void applyLogToStateMachine(long index) {
+ // Now maybe we apply to the state machine
+ for (long i = context.getLastApplied() + 1;
+ i < index + 1; i++) {
+ ActorRef clientActor = null;
+ String identifier = null;
+ ClientRequestTracker tracker = findClientRequestTracker(i);
+
+ if (tracker != null) {
+ clientActor = tracker.getClientActor();
+ identifier = tracker.getIdentifier();
+ }
+ ReplicatedLogEntry replicatedLogEntry =
+ context.getReplicatedLog().get(i);
+
+ if (replicatedLogEntry != null) {
+ actor().tell(new ApplyState(clientActor, identifier,
+ replicatedLogEntry), actor());
+ } else {
+ context.getLogger().error(
+ "Missing index " + i + " from log. Cannot apply state.");
+ }
+ }
+ // Send a local message to the local RaftActor (it's derived class to be
+ // specific to apply the log to it's index)
+ context.setLastApplied(index);
+ }
@Override
public RaftState handleMessage(ActorRef sender, Object message) {
return raftState;
}
+ @Override public String getLeaderId() {
+ return leaderId;
+ }
+
private RaftState applyTerm(RaftRPC rpc) {
// If RPC request or response contains term T > currentTerm:
// set currentTerm = T, convert to follower (§5.1)
return state();
}
- private void applyLogToStateMachine(long index) {
- // Send a local message to the local RaftActor (it's derived class to be
- // specific to apply the log to it's index)
- context.setLastApplied(index);
- }
-
-
}