+ private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
+
+ LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
+ logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
+ installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+
+ if(snapshotTracker == null){
+ snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
+ }
+
+ updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());
+
+ try {
+ if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
+ installSnapshot.getLastChunkHashCode())){
+ Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
+ new ArrayList<ReplicatedLogEntry>(),
+ installSnapshot.getLastIncludedIndex(),
+ installSnapshot.getLastIncludedTerm(),
+ installSnapshot.getLastIncludedIndex(),
+ installSnapshot.getLastIncludedTerm());
+
+ actor().tell(new ApplySnapshot(snapshot), actor());
+
+ snapshotTracker = null;
+
+ }
+
+ InstallSnapshotReply reply = new InstallSnapshotReply(
+ currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true);
+
+ LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
+
+ sender.tell(reply, actor());
+
+ } catch (SnapshotTracker.InvalidChunkException e) {
+ LOG.debug("{}: Exception in InstallSnapshot of follower", logName(), e);
+
+ sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+ -1, false), actor());
+ snapshotTracker = null;
+
+ } catch (Exception e){
+ LOG.error("{}: Exception in InstallSnapshot of follower", logName(), e);
+
+ //send reply with success as false. The chunk will be sent again on failure
+ sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+ installSnapshot.getChunkIndex(), false), actor());
+
+ }
+ }
+
+ @Override
+ public void close() throws Exception {