- private void installSnapshotIfNeeded() {
- for (String followerId : followers) {
- ActorSelection followerActor =
- context.getPeerActorSelection(followerId);
-
- if(followerActor != null) {
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- long nextIndex = followerLogInformation.getNextIndex().get();
-
- if (!context.getReplicatedLog().isPresent(nextIndex) &&
- context.getReplicatedLog().isInSnapshot(nextIndex)) {
- LOG.info("{} follower needs a snapshot install", followerId);
- if (snapshot.isPresent()) {
- // if a snapshot is present in the memory, most likely another install is in progress
- // no need to capture snapshot
- sendSnapshotChunk(followerActor, followerId);
-
- } else {
- initiateCaptureSnapshot();
- //we just need 1 follower who would need snapshot to be installed.
- // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
- // who needs an install and send to all who need
- break;
- }
-
- }
- }
- }
- }
-
- // on every install snapshot, we try to capture the snapshot.
- // Once a capture is going on, another one issued will get ignored by RaftActor.
- private void initiateCaptureSnapshot() {
- LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
- ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
- long lastAppliedIndex = -1;
- long lastAppliedTerm = -1;
-
- if (lastAppliedEntry != null) {
- lastAppliedIndex = lastAppliedEntry.getIndex();
- lastAppliedTerm = lastAppliedEntry.getTerm();
- } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
- lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
- lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
- }
-
- boolean isInstallSnapshotInitiated = true;
- actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
- lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
- actor());
- }
-
-
- private void sendInstallSnapshot() {
- for (String followerId : followers) {
- ActorSelection followerActor = context.getPeerActorSelection(followerId);
-
- if(followerActor != null) {
- FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long nextIndex = followerLogInformation.getNextIndex().get();
-
- if (!context.getReplicatedLog().isPresent(nextIndex) &&
- context.getReplicatedLog().isInSnapshot(nextIndex)) {
- sendSnapshotChunk(followerActor, followerId);
- }
- }
- }
- }
-
- /**
- * Sends a snapshot chunk to a given follower
- * InstallSnapshot should qualify as a heartbeat too.
- */
- private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
- try {
- if (snapshot.isPresent()) {
- followerActor.tell(
- new InstallSnapshot(currentTerm(), context.getId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
- getNextSnapshotChunk(followerId,snapshot.get()),
- mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks()
- ).toSerializable(),
- actor()
- );
- LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
- followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks());
- }
- } catch (IOException e) {
- LOG.error(e, "InstallSnapshot failed for Leader.");
- }
- }
-
- /**
- * Acccepts snaphot as ByteString, enters into map for future chunks
- * creates and return a ByteString chunk
- */
- private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
- if (followerToSnapshot == null) {
- followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
- mapFollowerToSnapshot.put(followerId, followerToSnapshot);
- }
- ByteString nextChunk = followerToSnapshot.getNextChunk();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
- }
- return nextChunk;
- }