+ // on every install snapshot, we try to capture the snapshot.
+ // Once a capture is going on, another one issued will get ignored by RaftActor.
+ private void initiateCaptureSnapshot() {
+ LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
+ ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
+ long lastAppliedIndex = -1;
+ long lastAppliedTerm = -1;
+
+ if (lastAppliedEntry != null) {
+ lastAppliedIndex = lastAppliedEntry.getIndex();
+ lastAppliedTerm = lastAppliedEntry.getTerm();
+ } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
+ lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
+ lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
+ }
+
+ boolean isInstallSnapshotInitiated = true;
+ actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
+ lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
+ actor());
+ }
+
+
+ private void sendInstallSnapshot() {
+ for (String followerId : followers) {
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+
+ if(followerActor != null) {
+ FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+ long nextIndex = followerLogInformation.getNextIndex().get();
+
+ if (!context.getReplicatedLog().isPresent(nextIndex) &&
+ context.getReplicatedLog().isInSnapshot(nextIndex)) {
+ sendSnapshotChunk(followerActor, followerId);
+ }
+ }
+ }
+ }
+
+ /**
+ * Sends a snapshot chunk to a given follower
+ * InstallSnapshot should qualify as a heartbeat too.
+ */
+ private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
+ try {
+ if (snapshot.isPresent()) {
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm(),
+ getNextSnapshotChunk(followerId,snapshot.get()),
+ mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
+ mapFollowerToSnapshot.get(followerId).getTotalChunks()
+ ).toSerializable(),
+ actor()
+ );
+ LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
+ followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+ mapFollowerToSnapshot.get(followerId).getTotalChunks());
+ }
+ } catch (IOException e) {
+ LOG.error(e, "InstallSnapshot failed for Leader.");
+ }
+ }
+
+ /**
+ * Acccepts snaphot as ByteString, enters into map for future chunks
+ * creates and return a ByteString chunk
+ */
+ private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
+ FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+ if (followerToSnapshot == null) {
+ followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
+ mapFollowerToSnapshot.put(followerId, followerToSnapshot);
+ }
+ ByteString nextChunk = followerToSnapshot.getNextChunk();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+ }
+ return nextChunk;
+ }
+
+ private void sendHeartBeat() {