+ private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("InstallSnapshot received by follower " +
+ "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+ installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()
+ );
+ }
+
+ try {
+ if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
+ // this is the last chunk, create a snapshot object and apply
+
+ snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Last chunk received: snapshotChunksCollected.size:{}",
+ snapshotChunksCollected.size());
+ }
+
+ Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
+ new ArrayList<ReplicatedLogEntry>(),
+ installSnapshot.getLastIncludedIndex(),
+ installSnapshot.getLastIncludedTerm(),
+ installSnapshot.getLastIncludedIndex(),
+ installSnapshot.getLastIncludedTerm());
+
+ actor().tell(new ApplySnapshot(snapshot), actor());
+
+ } else {
+ // we have more to go
+ snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Chunk={},snapshotChunksCollected.size:{}",
+ installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
+ }
+ }
+
+ sender.tell(new InstallSnapshotReply(
+ currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+ true), actor());
+
+ } catch (Exception e) {
+ LOG.error(e, "Exception in InstallSnapshot of follower:");
+ //send reply with success as false. The chunk will be sent again on failure
+ sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+ installSnapshot.getChunkIndex(), false), actor());
+ }
+ }
+