* This context should NOT be passed directly to any other actor it is
* only to be consumed by the RaftActorBehaviors
*/
- private RaftActorContext context;
+ protected RaftActorContext context;
/**
* The in-memory journal
context.setReplicatedLog(replicatedLog);
context.setLastApplied(snapshot.getLastAppliedIndex());
+ context.setCommitIndex(snapshot.getLastAppliedIndex());
LOG.info("Applied snapshot to replicatedLog. " +
"snapshotIndex={}, snapshotTerm={}, journal-size={}",
applyState(null, "recovery", logEntry.getData());
context.setLastApplied(logEntry.getIndex());
context.setCommitIndex(logEntry.getIndex());
+
} else if (message instanceof DeleteEntries) {
replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+
} else if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
+ context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+ ((UpdateElectionTerm) message).getVotedFor());
+
} else if (message instanceof RecoveryCompleted) {
- LOG.debug(
+ LOG.info(
"RecoveryCompleted - Switching actor to Follower - " +
"Persistence Id = " + persistenceId() +
" Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
- LOG.debug("Applying state for log index {} data {}",
- applyState.getReplicatedLogEntry().getIndex(),
- applyState.getReplicatedLogEntry().getData());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Applying state for log index {} data {}",
+ applyState.getReplicatedLogEntry().getIndex(),
+ applyState.getReplicatedLogEntry().getData());
+ }
applyState(applyState.getClientActor(), applyState.getIdentifier(),
applyState.getReplicatedLogEntry().getData());
} else if(message instanceof ApplySnapshot ) {
Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
- LOG.debug("ApplySnapshot called on Follower Actor " +
- "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
- snapshot.getLastAppliedTerm());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("ApplySnapshot called on Follower Actor " +
+ "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+ snapshot.getLastAppliedTerm()
+ );
+ }
applySnapshot(ByteString.copyFrom(snapshot.getState()));
//clears the followers log, sets the snapshot index to ensure adjusted-index works
} else {
if (!(message instanceof AppendEntriesMessages.AppendEntries)
&& !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
- LOG.debug("onReceiveCommand: message:" + message.getClass());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("onReceiveCommand: message:" + message.getClass());
+ }
}
RaftState state =
context.getReplicatedLog().lastIndex() + 1,
context.getTermInformation().getCurrentTerm(), data);
- LOG.debug("Persist data {}", replicatedLogEntry);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Persist data {}", replicatedLogEntry);
+ }
replicatedLog
.appendAndPersist(clientActor, identifier, replicatedLogEntry);
return null;
}
String peerAddress = context.getPeerAddress(leaderId);
- LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
- + peerAddress);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
+ + peerAddress);
+ }
return peerAddress;
}
lastAppliedTerm = lastAppliedEntry.getTerm();
}
- LOG.debug("Snapshot Capture logSize: {}", journal.size());
- LOG.debug("Snapshot Capture lastApplied:{} ", context.getLastApplied());
- LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
- LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Snapshot Capture logSize: {}", journal.size());
+ LOG.debug("Snapshot Capture lastApplied:{} ",
+ context.getLastApplied());
+ LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
+ LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+ }
// send a CaptureSnapshot to self to make the expensive operation async.
getSelf().tell(new CaptureSnapshot(
}
@Override public void update(long currentTerm, String votedFor) {
- LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+ }
this.currentTerm = currentTerm;
this.votedFor = votedFor;
}