X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractRaftActorBehavior.java;h=304b2fdbab0b8c242ae01e4768547cd8d5861a46;hb=fdab53ef9033fc83c812f7d3d6d3327d3d176f0f;hp=167082711d82b46c61e90e8311473d051e06461c;hpb=a0c5aba42aa36337ff1c6760175918b786897c9e;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 167082711d..304b2fdbab 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -10,9 +10,11 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.Cancellable; +import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; @@ -65,9 +67,13 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { /** * */ - private Cancellable electionCancel = null; + /** + * + */ + protected String leaderId = null; + protected AbstractRaftActorBehavior(RaftActorContext context) { this.context = context; @@ -93,65 +99,25 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected RaftState appendEntries(ActorRef sender, - AppendEntries appendEntries, RaftState raftState){ + AppendEntries appendEntries, RaftState raftState) { - // 1. Reply false if term < currentTerm (§5.1) - if(appendEntries.getTerm() < currentTerm()){ - sender.tell(new AppendEntriesReply(currentTerm(), false), actor()); - return state(); + if (raftState != state()) { + context.getLogger().debug("Suggested state is " + raftState + + " current behavior state is " + state()); } - // 2. Reply false if log doesn’t contain an entry at prevLogIndex - // whose term matches prevLogTerm (§5.3) - ReplicatedLogEntry previousEntry = context.getReplicatedLog() - .get(appendEntries.getPrevLogIndex()); - - if(previousEntry == null || previousEntry.getTerm() != appendEntries.getPrevLogTerm()){ - sender.tell(new AppendEntriesReply(currentTerm(), false), actor()); + // 1. Reply false if term < currentTerm (§5.1) + if (appendEntries.getTerm() < currentTerm()) { + context.getLogger().debug( + "Cannot append entries because sender term " + appendEntries + .getTerm() + " is less than " + currentTerm()); + sender.tell( + new AppendEntriesReply(context.getId(), currentTerm(), false, + lastIndex(), lastTerm()), actor() + ); return state(); } - if(appendEntries.getEntries() != null) { - // 3. If an existing entry conflicts with a new one (same index - // but different terms), delete the existing entry and all that - // follow it (§5.3) - int addEntriesFrom = 0; - for (int i = 0; - i < appendEntries.getEntries().size(); i++, addEntriesFrom++) { - ReplicatedLogEntry newEntry = context.getReplicatedLog() - .get(i + 1); - - if (newEntry != null && newEntry.getTerm() == appendEntries.getEntries().get(i).getTerm()){ - break; - } - if (newEntry != null && newEntry.getTerm() != appendEntries - .getEntries().get(i).getTerm()) { - context.getReplicatedLog().removeFrom(i + 1); - break; - } - } - - // 4. Append any new entries not already in the log - for (int i = addEntriesFrom; - i < appendEntries.getEntries().size(); i++) { - context.getReplicatedLog() - .append(appendEntries.getEntries().get(i)); - } - } - - - // 5. If leaderCommit > commitIndex, set commitIndex = - // min(leaderCommit, index of last new entry) - context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(), - context.getReplicatedLog().last().getIndex())); - - // If commitIndex > lastApplied: increment lastApplied, apply - // log[lastApplied] to state machine (§5.3) - if (appendEntries.getLeaderCommit() > context.getLastApplied()) { - applyLogToStateMachine(appendEntries.getLeaderCommit()); - } - - sender.tell(new AppendEntriesReply(currentTerm(), true), actor()); return handleAppendEntries(sender, appendEntries, raftState); } @@ -201,7 +167,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { if (requestVote.getLastLogTerm() > lastTerm()) { candidateLatest = true; } else if ((requestVote.getLastLogTerm() == lastTerm()) - && requestVote.getLastLogIndex() >= lastTerm()) { + && requestVote.getLastLogIndex() >= lastIndex()) { candidateLatest = true; } @@ -236,23 +202,21 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected abstract RaftState handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply, RaftState suggestedState); - /** - * @return The derived class should return the state that corresponds to - * it's behavior - */ - protected abstract RaftState state(); - protected FiniteDuration electionDuration() { long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE); return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS); } - protected void scheduleElection(FiniteDuration interval) { - + protected void stopElection() { if (electionCancel != null && !electionCancel.isCancelled()) { electionCancel.cancel(); } + } + + protected void scheduleElection(FiniteDuration interval) { + + stopElection(); // Schedule an election. When the scheduler triggers an ElectionTimeout // message is sent to itself @@ -275,13 +239,44 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } protected long lastTerm() { - return context.getReplicatedLog().last().getTerm(); + return context.getReplicatedLog().lastTerm(); } protected long lastIndex() { - return context.getReplicatedLog().last().getIndex(); + return context.getReplicatedLog().lastIndex(); } + protected ClientRequestTracker findClientRequestTracker(long logIndex) { + return null; + } + + protected void applyLogToStateMachine(long index) { + // Now maybe we apply to the state machine + for (long i = context.getLastApplied() + 1; + i < index + 1; i++) { + ActorRef clientActor = null; + String identifier = null; + ClientRequestTracker tracker = findClientRequestTracker(i); + + if (tracker != null) { + clientActor = tracker.getClientActor(); + identifier = tracker.getIdentifier(); + } + ReplicatedLogEntry replicatedLogEntry = + context.getReplicatedLog().get(i); + + if (replicatedLogEntry != null) { + actor().tell(new ApplyState(clientActor, identifier, + replicatedLogEntry), actor()); + } else { + context.getLogger().error( + "Missing index " + i + " from log. Cannot apply state."); + } + } + // Send a local message to the local RaftActor (it's derived class to be + // specific to apply the log to it's index) + context.setLastApplied(index); + } @Override public RaftState handleMessage(ActorRef sender, Object message) { @@ -307,6 +302,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return raftState; } + @Override public String getLeaderId() { + return leaderId; + } + private RaftState applyTerm(RaftRPC rpc) { // If RPC request or response contains term T > currentTerm: // set currentTerm = T, convert to follower (§5.1) @@ -318,11 +317,4 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return state(); } - private void applyLogToStateMachine(long index) { - // Send a local message to the local RaftActor (it's derived class to be - // specific to apply the log to it's index) - context.setLastApplied(index); - } - - }