onRecoveryComplete();
- RaftActorBehavior oldBehavior = currentBehavior;
- currentBehavior = new Follower(context);
- handleBehaviorChange(oldBehavior, currentBehavior);
+ initializeBehavior();
}
}
}
timer.start();
// Apply the snapshot to the actors state
- applyRecoverySnapshot(ByteString.copyFrom(snapshot.getState()));
+ applyRecoverySnapshot(snapshot.getState());
timer.stop();
LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
replicatedLog.size(), persistenceId(), timer.toString(),
- replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
+ replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
}
private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
"Persistence Id = " + persistenceId() +
" Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
"journal-size={}",
- replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
- replicatedLog.snapshotTerm, replicatedLog.size());
+ replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
+ replicatedLog.getSnapshotTerm(), replicatedLog.size());
+ initializeBehavior();
+ }
+
+ protected void initializeBehavior(){
+ changeCurrentBehavior(new Follower(context));
+ }
+
+ protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
RaftActorBehavior oldBehavior = currentBehavior;
- currentBehavior = new Follower(context);
+ currentBehavior = newBehavior;
handleBehaviorChange(oldBehavior, currentBehavior);
}
snapshot.getLastAppliedTerm()
);
}
- applySnapshot(ByteString.copyFrom(snapshot.getState()));
+
+ applySnapshot(snapshot.getState());
//clears the followers log, sets the snapshot index to ensure adjusted-index works
replicatedLog = new ReplicatedLogImpl(snapshot);
} else if (message instanceof CaptureSnapshot) {
LOG.info("CaptureSnapshot received by actor");
- CaptureSnapshot cs = (CaptureSnapshot)message;
- captureSnapshot = cs;
- createSnapshot();
- } else if (message instanceof CaptureSnapshotReply){
- LOG.info("CaptureSnapshotReply received by actor");
- CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
+ if(captureSnapshot == null) {
+ captureSnapshot = (CaptureSnapshot)message;
+ createSnapshot();
+ }
- ByteString stateInBytes = csr.getSnapshot();
- LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
- handleCaptureSnapshotReply(stateInBytes);
+ } else if (message instanceof CaptureSnapshotReply){
+ handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
} else {
if (!(message instanceof AppendEntriesMessages.AppendEntries)
if (oldBehavior != currentBehavior){
onStateChanged();
}
- if (oldBehavior != null) {
- // it can happen that the state has not changed but the leader has changed.
- onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
- if (getRoleChangeNotifier().isPresent() && oldBehavior.state() != currentBehavior.state()) {
- // we do not want to notify when the behavior/role is set for the first time (i.e follower)
- getRoleChangeNotifier().get().tell(new RoleChanged(getId(), oldBehavior.state().name(),
- currentBehavior.state().name()), getSelf());
- }
+ String oldBehaviorLeaderId = oldBehavior == null? null : oldBehavior.getLeaderId();
+ String oldBehaviorState = oldBehavior == null? null : oldBehavior.state().name();
+
+ // it can happen that the state has not changed but the leader has changed.
+ onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
+
+ if (getRoleChangeNotifier().isPresent() &&
+ (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
+ getRoleChangeNotifier().get().tell(
+ new RoleChanged(getId(), oldBehaviorState , currentBehavior.state().name()),
+ getSelf());
}
}
*
* @param snapshot A snapshot of the state of the actor
*/
- protected abstract void applyRecoverySnapshot(ByteString snapshot);
+ protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
/**
* This method is called during recovery at the end of a batch to apply the current batched
* operations when the derived actor is out of sync with it's peers
* and the only way to bring it in sync is by applying a snapshot
*
- * @param snapshot A snapshot of the state of the actor
+ * @param snapshotBytes A snapshot of the state of the actor
*/
- protected abstract void applySnapshot(ByteString snapshot);
+ protected abstract void applySnapshot(byte[] snapshotBytes);
/**
* This method will be called by the RaftActor when the state of the
return peerAddress;
}
- private void handleCaptureSnapshotReply(ByteString stateInBytes) {
+ private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
+ LOG.info("CaptureSnapshotReply received by actor: snapshot size {}", snapshotBytes.length);
+
// create a snapshot object from the state provided and save it
// when snapshot is saved async, SaveSnapshotSuccess is raised.
- Snapshot sn = Snapshot.create(stateInBytes.toByteArray(),
+ Snapshot sn = Snapshot.create(snapshotBytes,
context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
// this would be call straight to the leader and won't initiate in serialization
- currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(stateInBytes));
+ currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
+ ByteString.copyFrom(snapshotBytes)));
}
captureSnapshot = null;