@Override public void onReceiveRecover(Object message) {
if (message instanceof SnapshotOffer) {
- LOG.debug("SnapshotOffer called..");
+ LOG.info("SnapshotOffer called..");
SnapshotOffer offer = (SnapshotOffer) message;
Snapshot snapshot = (Snapshot) offer.snapshot();
context.setReplicatedLog(replicatedLog);
context.setLastApplied(snapshot.getLastAppliedIndex());
- LOG.debug("Applied snapshot to replicatedLog. " +
- "snapshotIndex={}, snapshotTerm={}, journal-size={}",
+ LOG.info("Applied snapshot to replicatedLog. " +
+ "snapshotIndex={}, snapshotTerm={}, journal-size={}",
replicatedLog.snapshotIndex, replicatedLog.snapshotTerm,
- replicatedLog.size());
+ replicatedLog.size()
+ );
// Apply the snapshot to the actors state
applySnapshot(ByteString.copyFrom(snapshot.getState()));
} else if (message instanceof ReplicatedLogEntry) {
- replicatedLog.append((ReplicatedLogEntry) message);
+ ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
+
+ // Apply State immediately
+ replicatedLog.append(logEntry);
+ applyState(null, "recovery", logEntry.getData());
+ context.setLastApplied(logEntry.getIndex());
+ context.setCommitIndex(logEntry.getIndex());
} else if (message instanceof DeleteEntries) {
replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
} else if (message instanceof UpdateElectionTerm) {
} else if (message instanceof RecoveryCompleted) {
LOG.debug(
"RecoveryCompleted - Switching actor to Follower - " +
- "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
+ "Persistence Id = " + persistenceId() +
+ " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
"journal-size={}",
replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
replicatedLog.snapshotTerm, replicatedLog.size());
context.removePeer(rrp.getName());
} else if (message instanceof CaptureSnapshot) {
- LOG.debug("CaptureSnapshot received by actor");
+ LOG.info("CaptureSnapshot received by actor");
CaptureSnapshot cs = (CaptureSnapshot)message;
captureSnapshot = cs;
createSnapshot();
} else if (message instanceof CaptureSnapshotReply){
- LOG.debug("CaptureSnapshotReply received by actor");
+ LOG.info("CaptureSnapshotReply received by actor");
CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
ByteString stateInBytes = csr.getSnapshot();
- LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
+ LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
handleCaptureSnapshotReply(stateInBytes);
} else {
if(oldBehavior != currentBehavior){
onStateChanged();
}
+
+ onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
}
}
protected ActorSelection getLeader(){
String leaderAddress = getLeaderAddress();
+ if(leaderAddress == null){
+ return null;
+ }
+
return context.actorSelection(leaderAddress);
}
*/
protected abstract void onStateChanged();
+ protected void onLeaderChanged(String oldLeader, String newLeader){};
+
private RaftActorBehavior switchBehavior(RaftState state) {
if (currentBehavior != null) {
if (currentBehavior.state() == state) {