package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import java.util.ArrayList;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
* </ul>
*/
public class Follower extends AbstractRaftActorBehavior {
+
+ private SnapshotTracker snapshotTracker = null;
+
public Follower(RaftActorContext context) {
super(context);
scheduleElection(electionDuration());
}
- @Override protected RaftState handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries) {
+ private boolean isLogEntryPresent(long index){
+ if(index == context.getReplicatedLog().getSnapshotIndex()){
+ return true;
+ }
+
+ ReplicatedLogEntry previousEntry = context.getReplicatedLog()
+ .get(index);
+
+ return previousEntry != null;
+
+ }
+
+ private long getLogEntryTerm(long index){
+ if(index == context.getReplicatedLog().getSnapshotIndex()){
+ return context.getReplicatedLog().getSnapshotTerm();
+ }
+
+ ReplicatedLogEntry previousEntry = context.getReplicatedLog()
+ .get(index);
+
+ if(previousEntry != null){
+ return previousEntry.getTerm();
+ }
+
+ return -1;
+ }
+
+ @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
+ AppendEntries appendEntries) {
if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
- context.getLogger()
- .info("Follower: Received {}", appendEntries.toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
+ }
}
// TODO : Refactor this method into a bunch of smaller methods
// 2. Reply false if log doesn’t contain an entry at prevLogIndex
// whose term matches prevLogTerm (§5.3)
- ReplicatedLogEntry previousEntry = context.getReplicatedLog()
- .get(appendEntries.getPrevLogIndex());
+ long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex());
+ boolean prevEntryPresent = isLogEntryPresent(appendEntries.getPrevLogIndex());
boolean outOfSync = true;
// First check if the logs are in sync or not
if (lastIndex() == -1
- && appendEntries.getPrevLogIndex() != -1) {
+ && appendEntries.getPrevLogIndex() != -1) {
// The follower's log is out of sync because the leader does have
// an entry at prevLogIndex and this follower has no entries in
// it's log.
- context.getLogger().debug(
- "The followers log is empty and the senders prevLogIndex is {}",
- appendEntries.getPrevLogIndex());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
+ context.getId(), appendEntries.getPrevLogIndex());
+ }
} else if (lastIndex() > -1
- && appendEntries.getPrevLogIndex() != -1
- && previousEntry == null) {
+ && appendEntries.getPrevLogIndex() != -1
+ && !prevEntryPresent) {
// The follower's log is out of sync because the Leader's
// prevLogIndex entry was not found in it's log
- context.getLogger().debug(
- "The log is not empty but the prevLogIndex {} was not found in it",
- appendEntries.getPrevLogIndex());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it",
+ context.getId(), appendEntries.getPrevLogIndex());
+ }
} else if (lastIndex() > -1
- && previousEntry != null
- && previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) {
+ && prevEntryPresent
+ && prevLogTerm != appendEntries.getPrevLogTerm()) {
// The follower's log is out of sync because the Leader's
// prevLogIndex entry does exist in the follower's log but it has
// a different term in it
- context.getLogger().debug(
- "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
- , previousEntry.getTerm()
- , appendEntries.getPrevLogTerm());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "{}: Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
+ , context.getId(), prevLogTerm
+ , appendEntries.getPrevLogTerm());
+ }
} else {
outOfSync = false;
}
if (outOfSync) {
// We found that the log was out of sync so just send a negative
// reply and return
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Follower ({}) is out-of-sync, " +
+ "so sending negative reply, lastIndex():{}, lastTerm():{}",
+ context.getId(), context.getId(), lastIndex(), lastTerm()
+ );
+ }
sender.tell(
new AppendEntriesReply(context.getId(), currentTerm(), false,
lastIndex(), lastTerm()), actor()
);
- return state();
+ return this;
}
if (appendEntries.getEntries() != null
&& appendEntries.getEntries().size() > 0) {
- context.getLogger().debug(
- "Number of entries to be appended = " + appendEntries
- .getEntries().size()
- );
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Number of entries to be appended = {}", context.getId(),
+ appendEntries.getEntries().size());
+ }
// 3. If an existing entry conflicts with a new one (same index
// but different terms), delete the existing entry and all that
int addEntriesFrom = 0;
if (context.getReplicatedLog().size() > 0) {
- // Find the entry up until which the one that is not in the
- // follower's log
- for (int i = 0;
- i < appendEntries.getEntries()
- .size(); i++, addEntriesFrom++) {
- ReplicatedLogEntry matchEntry =
- appendEntries.getEntries().get(i);
- ReplicatedLogEntry newEntry = context.getReplicatedLog()
- .get(matchEntry.getIndex());
+ // Find the entry up until which the one that is not in the follower's log
+ for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
+ ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i);
+ ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex());
if (newEntry == null) {
//newEntry not found in the log
continue;
}
- context.getLogger().debug(
- "Removing entries from log starting at "
- + matchEntry.getIndex()
- );
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Removing entries from log starting at {}", context.getId(),
+ matchEntry.getIndex());
+ }
// Entries do not match so remove all subsequent entries
context.getReplicatedLog()
}
}
- context.getLogger().debug(
- "After cleanup entries to be added from = " + (addEntriesFrom
- + lastIndex())
- );
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: After cleanup entries to be added from = {}", context.getId(),
+ (addEntriesFrom + lastIndex()));
+ }
// 4. Append any new entries not already in the log
for (int i = addEntriesFrom;
i < appendEntries.getEntries().size(); i++) {
- context.getLogger().info(
- "Append entry to log " + appendEntries.getEntries().get(
- i).getData()
- .toString()
- );
- context.getReplicatedLog()
- .appendAndPersist(appendEntries.getEntries().get(i));
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Append entry to log {}", context.getId(),
+ appendEntries.getEntries().get(i).getData());
+ }
+ context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i));
}
- context.getLogger().debug(
- "Log size is now " + context.getReplicatedLog().size());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Log size is now {}", context.getId(), context.getReplicatedLog().size());
+ }
}
context.getReplicatedLog().lastIndex()));
if (prevCommitIndex != context.getCommitIndex()) {
- context.getLogger()
- .debug("Commit index set to " + context.getCommitIndex());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Commit index set to {}", context.getId(), context.getCommitIndex());
+ }
}
// If commitIndex > lastApplied: increment lastApplied, apply
// log[lastApplied] to state machine (§5.3)
- if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
+ // check if there are any entries to be applied. last-applied can be equal to last-index
+ if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
+ context.getLastApplied() < lastIndex()) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: applyLogToStateMachine, " +
+ "appendEntries.getLeaderCommit():{}," +
+ "context.getLastApplied():{}, lastIndex():{}", context.getId(),
+ appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()
+ );
+ }
+
applyLogToStateMachine(appendEntries.getLeaderCommit());
}
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
lastIndex(), lastTerm()), actor());
- return state();
+ if (!context.isSnapshotCaptureInitiated()) {
+ fakeSnapshot(appendEntries.getReplicatedToAllIndex(), appendEntries.getReplicatedToAllIndex());
+ }
+
+ return this;
}
- @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+ @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
AppendEntriesReply appendEntriesReply) {
- return state();
+ return this;
}
- @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+ @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply) {
- return state();
+ return this;
}
@Override public RaftState state() {
return RaftState.Follower;
}
- @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+ @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
Object message = fromSerializableMessage(originalMessage);
}
if (message instanceof ElectionTimeout) {
- return RaftState.Candidate;
+ return switchBehavior(new Candidate(context));
} else if (message instanceof InstallSnapshot) {
InstallSnapshot installSnapshot = (InstallSnapshot) message;
- actor().tell(new ApplySnapshot(installSnapshot.getData()), actor());
+ handleInstallSnapshot(sender, installSnapshot);
}
scheduleElection(electionDuration());
return super.handleMessage(sender, message);
}
+ private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: InstallSnapshot received by follower " +
+ "datasize:{} , Chunk:{}/{}", context.getId(), installSnapshot.getData().size(),
+ installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()
+ );
+ }
+
+ if(snapshotTracker == null){
+ snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
+ }
+
+ try {
+ if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
+ installSnapshot.getLastChunkHashCode())){
+ Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
+ new ArrayList<ReplicatedLogEntry>(),
+ installSnapshot.getLastIncludedIndex(),
+ installSnapshot.getLastIncludedTerm(),
+ installSnapshot.getLastIncludedIndex(),
+ installSnapshot.getLastIncludedTerm());
+
+ actor().tell(new ApplySnapshot(snapshot), actor());
+
+ snapshotTracker = null;
+
+ }
+
+ sender.tell(new InstallSnapshotReply(
+ currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+ true), actor());
+
+ } catch (SnapshotTracker.InvalidChunkException e) {
+
+ sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+ -1, false), actor());
+ snapshotTracker = null;
+
+ } catch (Exception e){
+ LOG.error("{}: Exception in InstallSnapshot of follower", context.getId(), e);
+ //send reply with success as false. The chunk will be sent again on failure
+ sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+ installSnapshot.getChunkIndex(), false), actor());
+
+ }
+ }
+
@Override public void close() throws Exception {
stopElection();
}
+
+ @VisibleForTesting
+ ByteString getSnapshotChunksCollected(){
+ return snapshotTracker != null ? snapshotTracker.getCollectedChunks() : ByteString.EMPTY;
+ }
+
+
}