timer.start();
// Apply the snapshot to the actors state
- applyRecoverySnapshot(ByteString.copyFrom(snapshot.getState()));
+ applyRecoverySnapshot(snapshot.getState());
timer.stop();
LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
snapshot.getLastAppliedTerm()
);
}
- applySnapshot(ByteString.copyFrom(snapshot.getState()));
+
+ applySnapshot(snapshot.getState());
//clears the followers log, sets the snapshot index to ensure adjusted-index works
replicatedLog = new ReplicatedLogImpl(snapshot);
} else if (message instanceof CaptureSnapshot) {
LOG.info("CaptureSnapshot received by actor");
- CaptureSnapshot cs = (CaptureSnapshot)message;
- captureSnapshot = cs;
- createSnapshot();
- } else if (message instanceof CaptureSnapshotReply){
- LOG.info("CaptureSnapshotReply received by actor");
- CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
+ if(captureSnapshot == null) {
+ captureSnapshot = (CaptureSnapshot)message;
+ createSnapshot();
+ }
- ByteString stateInBytes = csr.getSnapshot();
- LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
- handleCaptureSnapshotReply(stateInBytes);
+ } else if (message instanceof CaptureSnapshotReply){
+ handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
} else {
if (!(message instanceof AppendEntriesMessages.AppendEntries)
*
* @param snapshot A snapshot of the state of the actor
*/
- protected abstract void applyRecoverySnapshot(ByteString snapshot);
+ protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
/**
* This method is called during recovery at the end of a batch to apply the current batched
* operations when the derived actor is out of sync with it's peers
* and the only way to bring it in sync is by applying a snapshot
*
- * @param snapshot A snapshot of the state of the actor
+ * @param snapshotBytes A snapshot of the state of the actor
*/
- protected abstract void applySnapshot(ByteString snapshot);
+ protected abstract void applySnapshot(byte[] snapshotBytes);
/**
* This method will be called by the RaftActor when the state of the
return peerAddress;
}
- private void handleCaptureSnapshotReply(ByteString stateInBytes) {
+ private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
+ LOG.info("CaptureSnapshotReply received by actor: snapshot size {}", snapshotBytes.length);
+
// create a snapshot object from the state provided and save it
// when snapshot is saved async, SaveSnapshotSuccess is raised.
- Snapshot sn = Snapshot.create(stateInBytes.toByteArray(),
+ Snapshot sn = Snapshot.create(snapshotBytes,
context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
// this would be call straight to the leader and won't initiate in serialization
- currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(stateInBytes));
+ currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
+ ByteString.copyFrom(snapshotBytes)));
}
captureSnapshot = null;