package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
-import akka.event.LoggingAdapter;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
+import java.util.ArrayList;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import java.util.ArrayList;
-
/**
* The behavior of a RaftActor in the Follower state
* <p/>
* </ul>
*/
public class Follower extends AbstractRaftActorBehavior {
- private ByteString snapshotChunksCollected = ByteString.EMPTY;
- private final LoggingAdapter LOG;
+ private SnapshotTracker snapshotTracker = null;
public Follower(RaftActorContext context) {
super(context);
- LOG = context.getLogger();
-
scheduleElection(electionDuration());
}
- @Override protected RaftState handleAppendEntries(ActorRef sender,
+ @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
new AppendEntriesReply(context.getId(), currentTerm(), false,
lastIndex(), lastTerm()), actor()
);
- return state();
+ return this;
}
if (appendEntries.getEntries() != null
&& appendEntries.getEntries().size() > 0) {
if(LOG.isDebugEnabled()) {
LOG.debug(
- "Number of entries to be appended = " + appendEntries
- .getEntries().size()
+ "Number of entries to be appended = {}", appendEntries.getEntries().size()
);
}
if(LOG.isDebugEnabled()) {
LOG.debug(
- "Removing entries from log starting at "
- + matchEntry.getIndex()
+ "Removing entries from log starting at {}", matchEntry.getIndex()
);
}
}
if(LOG.isDebugEnabled()) {
- context.getLogger().debug(
- "After cleanup entries to be added from = " + (addEntriesFrom
- + lastIndex())
+ LOG.debug("After cleanup entries to be added from = {}", (addEntriesFrom + lastIndex())
);
}
for (int i = addEntriesFrom;
i < appendEntries.getEntries().size(); i++) {
- context.getLogger().info(
- "Append entry to log " + appendEntries.getEntries().get(
- i).getData()
- .toString()
- );
- context.getReplicatedLog()
- .appendAndPersist(appendEntries.getEntries().get(i));
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Append entry to log {}", appendEntries.getEntries().get(i).getData());
+ }
+ context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i));
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Log size is now " + context.getReplicatedLog().size());
+ LOG.debug("Log size is now {}", context.getReplicatedLog().size());
}
}
if (prevCommitIndex != context.getCommitIndex()) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Commit index set to " + context.getCommitIndex());
+ LOG.debug("Commit index set to {}", context.getCommitIndex());
}
}
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
lastIndex(), lastTerm()), actor());
- return state();
+ return this;
}
- @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+ @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
AppendEntriesReply appendEntriesReply) {
- return state();
+ return this;
}
- @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+ @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply) {
- return state();
+ return this;
}
@Override public RaftState state() {
return RaftState.Follower;
}
- @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+ @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
Object message = fromSerializableMessage(originalMessage);
}
if (message instanceof ElectionTimeout) {
- return RaftState.Candidate;
+ return switchBehavior(new Candidate(context));
} else if (message instanceof InstallSnapshot) {
InstallSnapshot installSnapshot = (InstallSnapshot) message;
);
}
- try {
- if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
- // this is the last chunk, create a snapshot object and apply
-
- snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
- context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}",
- snapshotChunksCollected.size());
+ if(snapshotTracker == null){
+ snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
+ }
- Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
- new ArrayList<ReplicatedLogEntry>(),
- installSnapshot.getLastIncludedIndex(),
- installSnapshot.getLastIncludedTerm(),
- installSnapshot.getLastIncludedIndex(),
- installSnapshot.getLastIncludedTerm());
+ try {
+ if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
+ installSnapshot.getLastChunkHashCode())){
+ Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
+ new ArrayList<ReplicatedLogEntry>(),
+ installSnapshot.getLastIncludedIndex(),
+ installSnapshot.getLastIncludedTerm(),
+ installSnapshot.getLastIncludedIndex(),
+ installSnapshot.getLastIncludedTerm());
actor().tell(new ApplySnapshot(snapshot), actor());
- } else {
- // we have more to go
- snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+ snapshotTracker = null;
- if(LOG.isDebugEnabled()) {
- LOG.debug("Chunk={},snapshotChunksCollected.size:{}",
- installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
- }
}
sender.tell(new InstallSnapshotReply(
- currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
- true), actor());
+ currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+ true), actor());
+
+ } catch (SnapshotTracker.InvalidChunkException e) {
+
+ sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+ -1, false), actor());
+ snapshotTracker = null;
+
+ } catch (Exception e){
- } catch (Exception e) {
- context.getLogger().error("Exception in InstallSnapshot of follower", e);
+ LOG.error(e, "Exception in InstallSnapshot of follower:");
//send reply with success as false. The chunk will be sent again on failure
sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
- installSnapshot.getChunkIndex(), false), actor());
+ installSnapshot.getChunkIndex(), false), actor());
+
}
}
@Override public void close() throws Exception {
stopElection();
}
+
+ @VisibleForTesting
+ ByteString getSnapshotChunksCollected(){
+ return snapshotTracker != null ? snapshotTracker.getCollectedChunks() : ByteString.EMPTY;
+ }
+
+
}