- private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
- String followerId = reply.getFollowerId();
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
- FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- followerLogInformation.markFollowerActive();
-
- if (followerToSnapshot != null &&
- followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
-
- if (reply.isSuccess()) {
- if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
- //this was the last chunk reply
- if(LOG.isDebugEnabled()) {
- LOG.debug("InstallSnapshotReply received, " +
- "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
- reply.getChunkIndex(), followerId,
- context.getReplicatedLog().getSnapshotIndex() + 1
- );
- }
-
- followerLogInformation.setMatchIndex(
- context.getReplicatedLog().getSnapshotIndex());
- followerLogInformation.setNextIndex(
- context.getReplicatedLog().getSnapshotIndex() + 1);
- mapFollowerToSnapshot.remove(followerId);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
- followerToLog.get(followerId).getNextIndex().get());
- }
-
- if (mapFollowerToSnapshot.isEmpty()) {
- // once there are no pending followers receiving snapshots
- // we can remove snapshot from the memory
- setSnapshot(Optional.<ByteString>absent());
- }
-
- } else {
- followerToSnapshot.markSendStatus(true);
- }
- } else {
- LOG.info("InstallSnapshotReply received, " +
- "sending snapshot chunk failed, Will retry, Chunk:{}",
- reply.getChunkIndex()
- );
- followerToSnapshot.markSendStatus(false);
- }
-
- } else {
- LOG.error("ERROR!!" +
- "FollowerId in InstallSnapshotReply not known to Leader" +
- " or Chunk Index in InstallSnapshotReply not matching {} != {}",
- followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
- );
- }
- }
-
- private void replicate(Replicate replicate) {
- long logIndex = replicate.getReplicatedLogEntry().getIndex();
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("Replicate message {}", logIndex);
- }
-
- // Create a tracker entry we will use this later to notify the
- // client actor
- trackerList.add(
- new ClientRequestTrackerImpl(replicate.getClientActor(),
- replicate.getIdentifier(),
- logIndex)
- );
-
- if (followers.size() == 0) {
- context.setCommitIndex(logIndex);
- applyLogToStateMachine(logIndex);
- } else {
- sendAppendEntries();