package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import java.util.ArrayList;
+
/**
* The behavior of a RaftActor in the Follower state
* <p/>
* </ul>
*/
public class Follower extends AbstractRaftActorBehavior {
+ private ByteString snapshotChunksCollected = ByteString.EMPTY;
+
public Follower(RaftActorContext context) {
super(context);
if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
context.getLogger()
- .info("Follower: Received {}", appendEntries.toString());
+ .debug(appendEntries.toString());
}
// TODO : Refactor this method into a bunch of smaller methods
if (outOfSync) {
// We found that the log was out of sync so just send a negative
// reply and return
+ context.getLogger().debug("Follower is out-of-sync, " +
+ "so sending negative reply, lastIndex():{}, lastTerm():{}",
+ lastIndex(), lastTerm());
sender.tell(
new AppendEntriesReply(context.getId(), currentTerm(), false,
lastIndex(), lastTerm()), actor()
// If commitIndex > lastApplied: increment lastApplied, apply
// log[lastApplied] to state machine (ยง5.3)
- if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
+ // check if there are any entries to be applied. last-applied can be equal to last-index
+ if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
+ context.getLastApplied() < lastIndex()) {
+ context.getLogger().debug("applyLogToStateMachine, " +
+ "appendEntries.getLeaderCommit():{}," +
+ "context.getLastApplied():{}, lastIndex():{}",
+ appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex());
applyLogToStateMachine(appendEntries.getLeaderCommit());
}
} else if (message instanceof InstallSnapshot) {
InstallSnapshot installSnapshot = (InstallSnapshot) message;
- actor().tell(new ApplySnapshot(installSnapshot.getData()), actor());
+ handleInstallSnapshot(sender, installSnapshot);
}
scheduleElection(electionDuration());
return super.handleMessage(sender, message);
}
+ private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
+ context.getLogger().debug("InstallSnapshot received by follower " +
+ "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+ installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+
+ try {
+ if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
+ // this is the last chunk, create a snapshot object and apply
+
+ snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+ context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}",
+ snapshotChunksCollected.size());
+
+ Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
+ new ArrayList<ReplicatedLogEntry>(),
+ installSnapshot.getLastIncludedIndex(),
+ installSnapshot.getLastIncludedTerm(),
+ installSnapshot.getLastIncludedIndex(),
+ installSnapshot.getLastIncludedTerm());
+
+ actor().tell(new ApplySnapshot(snapshot), actor());
+
+ } else {
+ // we have more to go
+ snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+ context.getLogger().debug("Chunk={},snapshotChunksCollected.size:{}",
+ installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
+ }
+
+ sender.tell(new InstallSnapshotReply(
+ currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+ true), actor());
+
+ } catch (Exception e) {
+ context.getLogger().error("Exception in InstallSnapshot of follower", e);
+ //send reply with success as false. The chunk will be sent again on failure
+ sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+ installSnapshot.getChunkIndex(), false), actor());
+ }
+ }
+
@Override public void close() throws Exception {
stopElection();
}