scheduleElection(electionDuration());
}
- @Override protected RaftState handleAppendEntries(ActorRef sender,
+ @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
- context.getLogger()
- .debug(appendEntries.toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(appendEntries.toString());
+ }
}
// TODO : Refactor this method into a bunch of smaller methods
// an entry at prevLogIndex and this follower has no entries in
// it's log.
- context.getLogger().debug(
- "The followers log is empty and the senders prevLogIndex is {}",
- appendEntries.getPrevLogIndex());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("The followers log is empty and the senders prevLogIndex is {}",
+ appendEntries.getPrevLogIndex());
+ }
} else if (lastIndex() > -1
&& appendEntries.getPrevLogIndex() != -1
// The follower's log is out of sync because the Leader's
// prevLogIndex entry was not found in it's log
- context.getLogger().debug(
- "The log is not empty but the prevLogIndex {} was not found in it",
- appendEntries.getPrevLogIndex());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("The log is not empty but the prevLogIndex {} was not found in it",
+ appendEntries.getPrevLogIndex());
+ }
} else if (lastIndex() > -1
&& previousEntry != null
// prevLogIndex entry does exist in the follower's log but it has
// a different term in it
- context.getLogger().debug(
- "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
- , previousEntry.getTerm()
- , appendEntries.getPrevLogTerm());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
+ , previousEntry.getTerm()
+ , appendEntries.getPrevLogTerm());
+ }
} else {
outOfSync = false;
}
if (outOfSync) {
// We found that the log was out of sync so just send a negative
// reply and return
- context.getLogger().debug("Follower is out-of-sync, " +
- "so sending negative reply, lastIndex():{}, lastTerm():{}",
- lastIndex(), lastTerm());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Follower is out-of-sync, " +
+ "so sending negative reply, lastIndex():{}, lastTerm():{}",
+ lastIndex(), lastTerm()
+ );
+ }
sender.tell(
new AppendEntriesReply(context.getId(), currentTerm(), false,
lastIndex(), lastTerm()), actor()
);
- return state();
+ return this;
}
if (appendEntries.getEntries() != null
&& appendEntries.getEntries().size() > 0) {
- context.getLogger().debug(
- "Number of entries to be appended = " + appendEntries
- .getEntries().size()
- );
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Number of entries to be appended = {}", appendEntries.getEntries().size()
+ );
+ }
// 3. If an existing entry conflicts with a new one (same index
// but different terms), delete the existing entry and all that
continue;
}
- context.getLogger().debug(
- "Removing entries from log starting at "
- + matchEntry.getIndex()
- );
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Removing entries from log starting at {}", matchEntry.getIndex()
+ );
+ }
// Entries do not match so remove all subsequent entries
context.getReplicatedLog()
}
}
- context.getLogger().debug(
- "After cleanup entries to be added from = " + (addEntriesFrom
- + lastIndex())
- );
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("After cleanup entries to be added from = {}", (addEntriesFrom + lastIndex())
+ );
+ }
// 4. Append any new entries not already in the log
for (int i = addEntriesFrom;
i < appendEntries.getEntries().size(); i++) {
- context.getLogger().info(
- "Append entry to log " + appendEntries.getEntries().get(
- i).getData()
- .toString()
- );
- context.getReplicatedLog()
- .appendAndPersist(appendEntries.getEntries().get(i));
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Append entry to log {}", appendEntries.getEntries().get(i).getData());
+ }
+ context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i));
}
- context.getLogger().debug(
- "Log size is now " + context.getReplicatedLog().size());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Log size is now {}", context.getReplicatedLog().size());
+ }
}
context.getReplicatedLog().lastIndex()));
if (prevCommitIndex != context.getCommitIndex()) {
- context.getLogger()
- .debug("Commit index set to " + context.getCommitIndex());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Commit index set to {}", context.getCommitIndex());
+ }
}
// If commitIndex > lastApplied: increment lastApplied, apply
// check if there are any entries to be applied. last-applied can be equal to last-index
if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
context.getLastApplied() < lastIndex()) {
- context.getLogger().debug("applyLogToStateMachine, " +
- "appendEntries.getLeaderCommit():{}," +
- "context.getLastApplied():{}, lastIndex():{}",
- appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("applyLogToStateMachine, " +
+ "appendEntries.getLeaderCommit():{}," +
+ "context.getLastApplied():{}, lastIndex():{}",
+ appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()
+ );
+ }
+
applyLogToStateMachine(appendEntries.getLeaderCommit());
}
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
lastIndex(), lastTerm()), actor());
- return state();
+ return this;
}
- @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+ @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
AppendEntriesReply appendEntriesReply) {
- return state();
+ return this;
}
- @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+ @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply) {
- return state();
+ return this;
}
@Override public RaftState state() {
return RaftState.Follower;
}
- @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+ @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
Object message = fromSerializableMessage(originalMessage);
}
if (message instanceof ElectionTimeout) {
- return RaftState.Candidate;
+ return switchBehavior(new Candidate(context));
} else if (message instanceof InstallSnapshot) {
InstallSnapshot installSnapshot = (InstallSnapshot) message;
}
private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
- context.getLogger().debug("InstallSnapshot received by follower " +
- "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
- installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("InstallSnapshot received by follower " +
+ "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+ installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()
+ );
+ }
try {
if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
// this is the last chunk, create a snapshot object and apply
snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
- context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}",
- snapshotChunksCollected.size());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Last chunk received: snapshotChunksCollected.size:{}",
+ snapshotChunksCollected.size());
+ }
Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
new ArrayList<ReplicatedLogEntry>(),
} else {
// we have more to go
snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
- context.getLogger().debug("Chunk={},snapshotChunksCollected.size:{}",
- installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Chunk={},snapshotChunksCollected.size:{}",
+ installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
+ }
}
sender.tell(new InstallSnapshotReply(
true), actor());
} catch (Exception e) {
- context.getLogger().error("Exception in InstallSnapshot of follower", e);
+ LOG.error(e, "Exception in InstallSnapshot of follower:");
//send reply with success as false. The chunk will be sent again on failure
sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
installSnapshot.getChunkIndex(), false), actor());