Bug-2692:Avoid fake snapshot during initiate snapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.collect.ImmutableMap;
18 import com.google.common.collect.ImmutableMap.Builder;
19 import com.google.protobuf.ByteString;
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.Iterator;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.concurrent.TimeUnit;
30 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
31 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
32 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
33 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
34 import org.opendaylight.controller.cluster.raft.RaftActorContext;
35 import org.opendaylight.controller.cluster.raft.RaftState;
36 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
37 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
38 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
40 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
41 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
43 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
44 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
45 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
46 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
47 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
48 import scala.concurrent.duration.FiniteDuration;
49
50 /**
51  * The behavior of a RaftActor when it is in the Leader state
52  * <p/>
53  * Leaders:
54  * <ul>
55  * <li> Upon election: send initial empty AppendEntries RPCs
56  * (heartbeat) to each server; repeat during idle periods to
57  * prevent election timeouts (§5.2)
58  * <li> If command received from client: append entry to local log,
59  * respond after entry applied to state machine (§5.3)
60  * <li> If last log index ≥ nextIndex for a follower: send
61  * AppendEntries RPC with log entries starting at nextIndex
62  * <ul>
63  * <li> If successful: update nextIndex and matchIndex for
64  * follower (§5.3)
65  * <li> If AppendEntries fails because of log inconsistency:
66  * decrement nextIndex and retry (§5.3)
67  * </ul>
68  * <li> If there exists an N such that N > commitIndex, a majority
69  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
70  * set commitIndex = N (§5.3, §5.4).
71  */
72 public abstract class AbstractLeader extends AbstractRaftActorBehavior {
73
74     // The index of the first chunk that is sent when installing a snapshot
75     public static final int FIRST_CHUNK_INDEX = 1;
76
77     // The index that the follower should respond with if it needs the install snapshot to be reset
78     public static final int INVALID_CHUNK_INDEX = -1;
79
80     // This would be passed as the hash code of the last chunk when sending the first chunk
81     public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
82
83     private final Map<String, FollowerLogInformation> followerToLog;
84     private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
85
86     private Cancellable heartbeatSchedule = null;
87
88     private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
89
90     protected final int minReplicationCount;
91
92     protected final int minIsolatedLeaderPeerCount;
93
94     private Optional<ByteString> snapshot;
95
96     private long replicatedToAllIndex = -1;
97
98     public AbstractLeader(RaftActorContext context) {
99         super(context);
100
101         final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
102         for (String followerId : context.getPeerAddresses().keySet()) {
103             FollowerLogInformation followerLogInformation =
104                 new FollowerLogInformationImpl(followerId,
105                     context.getCommitIndex(), -1,
106                     context.getConfigParams().getElectionTimeOutInterval());
107
108             ftlBuilder.put(followerId, followerLogInformation);
109         }
110         followerToLog = ftlBuilder.build();
111
112         leaderId = context.getId();
113
114         LOG.debug("{}: Election: Leader has following peers: {}", context.getId(), getFollowerIds());
115
116         minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
117
118         // the isolated Leader peer count will be 1 less than the majority vote count.
119         // this is because the vote count has the self vote counted in it
120         // for e.g
121         // 0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0
122         // 2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1
123         // 4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2
124         minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
125
126         snapshot = Optional.absent();
127
128         // Immediately schedule a heartbeat
129         // Upon election: send initial empty AppendEntries RPCs
130         // (heartbeat) to each server; repeat during idle periods to
131         // prevent election timeouts (§5.2)
132         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
133     }
134
135     /**
136      * Return an immutable collection of follower identifiers.
137      *
138      * @return Collection of follower IDs
139      */
140     protected final Collection<String> getFollowerIds() {
141         return followerToLog.keySet();
142     }
143
144     private Optional<ByteString> getSnapshot() {
145         return snapshot;
146     }
147
148     @VisibleForTesting
149     void setSnapshot(Optional<ByteString> snapshot) {
150         this.snapshot = snapshot;
151     }
152
153     @Override
154     protected RaftActorBehavior handleAppendEntries(ActorRef sender,
155         AppendEntries appendEntries) {
156
157         if(LOG.isDebugEnabled()) {
158             LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
159         }
160
161         return this;
162     }
163
164     @Override
165     protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
166         AppendEntriesReply appendEntriesReply) {
167
168         if(! appendEntriesReply.isSuccess()) {
169             if(LOG.isDebugEnabled()) {
170                 LOG.debug("{}: handleAppendEntriesReply: {}", context.getId(), appendEntriesReply);
171             }
172         }
173
174         // Update the FollowerLogInformation
175         String followerId = appendEntriesReply.getFollowerId();
176         FollowerLogInformation followerLogInformation =
177             followerToLog.get(followerId);
178
179         if(followerLogInformation == null){
180             LOG.error("{}: handleAppendEntriesReply - unknown follower {}", context.getId(), followerId);
181             return this;
182         }
183
184         followerLogInformation.markFollowerActive();
185
186         if (appendEntriesReply.isSuccess()) {
187             followerLogInformation
188                 .setMatchIndex(appendEntriesReply.getLogLastIndex());
189             followerLogInformation
190                 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
191         } else {
192
193             // TODO: When we find that the follower is out of sync with the
194             // Leader we simply decrement that followers next index by 1.
195             // Would it be possible to do better than this? The RAFT spec
196             // does not explicitly deal with it but may be something for us to
197             // think about
198
199             followerLogInformation.decrNextIndex();
200         }
201
202         // Now figure out if this reply warrants a change in the commitIndex
203         // If there exists an N such that N > commitIndex, a majority
204         // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
205         // set commitIndex = N (§5.3, §5.4).
206         for (long N = context.getCommitIndex() + 1; ; N++) {
207             int replicatedCount = 1;
208
209             for (FollowerLogInformation info : followerToLog.values()) {
210                 if (info.getMatchIndex() >= N) {
211                     replicatedCount++;
212                 }
213             }
214
215             if (replicatedCount >= minReplicationCount) {
216                 ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
217                 if (replicatedLogEntry != null &&
218                     replicatedLogEntry.getTerm() == currentTerm()) {
219                     context.setCommitIndex(N);
220                 }
221             } else {
222                 break;
223             }
224         }
225
226         // Apply the change to the state machine
227         if (context.getCommitIndex() > context.getLastApplied()) {
228             applyLogToStateMachine(context.getCommitIndex());
229         }
230
231         if (!context.isSnapshotCaptureInitiated()) {
232             purgeInMemoryLog();
233         }
234
235         return this;
236     }
237
238     private void purgeInMemoryLog() {
239         //find the lowest index across followers which has been replicated to all. -1 if there are no followers.
240         // we would delete the in-mem log from that index on, in-order to minimize mem usage
241         // we would also share this info thru AE with the followers so that they can delete their log entries as well.
242         long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE;
243         for (FollowerLogInformation info : followerToLog.values()) {
244             minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
245         }
246
247         replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex);
248     }
249
250     @Override
251     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
252         final Iterator<ClientRequestTracker> it = trackerList.iterator();
253         while (it.hasNext()) {
254             final ClientRequestTracker t = it.next();
255             if (t.getIndex() == logIndex) {
256                 it.remove();
257                 return t;
258             }
259         }
260
261         return null;
262     }
263
264     @Override
265     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
266         for (ClientRequestTracker tracker : trackerList) {
267             if (tracker.getIndex() == logIndex) {
268                 return tracker;
269             }
270         }
271         return null;
272     }
273
274     @Override
275     protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
276         RequestVoteReply requestVoteReply) {
277         return this;
278     }
279
280     @Override
281     public RaftState state() {
282         return RaftState.Leader;
283     }
284
285     @Override
286     public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
287         Preconditions.checkNotNull(sender, "sender should not be null");
288
289         Object message = fromSerializableMessage(originalMessage);
290
291         if (message instanceof RaftRPC) {
292             RaftRPC rpc = (RaftRPC) message;
293             // If RPC request or response contains term T > currentTerm:
294             // set currentTerm = T, convert to follower (§5.1)
295             // This applies to all RPC messages and responses
296             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
297                 LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(),
298                         rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
299
300                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
301
302                 return switchBehavior(new Follower(context));
303             }
304         }
305
306         try {
307             if (message instanceof SendHeartBeat) {
308                 sendHeartBeat();
309                 return this;
310
311             } else if(message instanceof InitiateInstallSnapshot) {
312                 installSnapshotIfNeeded();
313
314             } else if(message instanceof SendInstallSnapshot) {
315                 // received from RaftActor
316                 setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
317                 sendInstallSnapshot();
318
319             } else if (message instanceof Replicate) {
320                 replicate((Replicate) message);
321
322             } else if (message instanceof InstallSnapshotReply){
323                 handleInstallSnapshotReply((InstallSnapshotReply) message);
324
325             }
326         } finally {
327             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
328         }
329
330         return super.handleMessage(sender, message);
331     }
332
333     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
334         String followerId = reply.getFollowerId();
335         FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
336
337         if (followerToSnapshot == null) {
338             LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
339                     context.getId(), followerId);
340             return;
341         }
342
343         FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
344         followerLogInformation.markFollowerActive();
345
346         if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
347             if (reply.isSuccess()) {
348                 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
349                     //this was the last chunk reply
350                     if(LOG.isDebugEnabled()) {
351                         LOG.debug("{}: InstallSnapshotReply received, " +
352                                 "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
353                                 context.getId(), reply.getChunkIndex(), followerId,
354                             context.getReplicatedLog().getSnapshotIndex() + 1
355                         );
356                     }
357
358                     followerLogInformation.setMatchIndex(
359                         context.getReplicatedLog().getSnapshotIndex());
360                     followerLogInformation.setNextIndex(
361                         context.getReplicatedLog().getSnapshotIndex() + 1);
362                     mapFollowerToSnapshot.remove(followerId);
363
364                     if(LOG.isDebugEnabled()) {
365                         LOG.debug("{}: followerToLog.get(followerId).getNextIndex()=" +
366                                 context.getId(), followerToLog.get(followerId).getNextIndex());
367                     }
368
369                     if (mapFollowerToSnapshot.isEmpty()) {
370                         // once there are no pending followers receiving snapshots
371                         // we can remove snapshot from the memory
372                         setSnapshot(Optional.<ByteString>absent());
373                     }
374
375                 } else {
376                     followerToSnapshot.markSendStatus(true);
377                 }
378             } else {
379                 LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
380                         context.getId(), reply.getChunkIndex());
381
382                 followerToSnapshot.markSendStatus(false);
383             }
384         } else {
385             LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
386                     context.getId(), reply.getChunkIndex(), followerId,
387                     followerToSnapshot.getChunkIndex());
388
389             if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
390                 // Since the Follower did not find this index to be valid we should reset the follower snapshot
391                 // so that Installing the snapshot can resume from the beginning
392                 followerToSnapshot.reset();
393             }
394         }
395     }
396
397     private void replicate(Replicate replicate) {
398         long logIndex = replicate.getReplicatedLogEntry().getIndex();
399
400         if(LOG.isDebugEnabled()) {
401             LOG.debug("{}: Replicate message {}", context.getId(), logIndex);
402         }
403
404         // Create a tracker entry we will use this later to notify the
405         // client actor
406         trackerList.add(
407             new ClientRequestTrackerImpl(replicate.getClientActor(),
408                 replicate.getIdentifier(),
409                 logIndex)
410         );
411
412         if (followerToLog.isEmpty()) {
413             context.setCommitIndex(logIndex);
414             applyLogToStateMachine(logIndex);
415         } else {
416             sendAppendEntries();
417         }
418     }
419
420     private void sendAppendEntries() {
421         // Send an AppendEntries to all followers
422
423         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
424             final String followerId = e.getKey();
425             ActorSelection followerActor = context.getPeerActorSelection(followerId);
426
427             if (followerActor != null) {
428                 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
429                 long followerNextIndex = followerLogInformation.getNextIndex();
430                 boolean isFollowerActive = followerLogInformation.isFollowerActive();
431
432                 FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
433                 if (followerToSnapshot != null) {
434                     // if install snapshot is in process , then sent next chunk if possible
435                     if (isFollowerActive && followerToSnapshot.canSendNextChunk()) {
436                         sendSnapshotChunk(followerActor, followerId);
437                     } else {
438                         // we send a heartbeat even if we have not received a reply for the last chunk
439                         sendAppendEntriesToFollower(followerActor, followerNextIndex,
440                             Collections.<ReplicatedLogEntry>emptyList(), followerId);
441                     }
442
443                 } else {
444                     long leaderLastIndex = context.getReplicatedLog().lastIndex();
445                     long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
446                     final List<ReplicatedLogEntry> entries;
447
448                     LOG.debug("{}: Checking sendAppendEntries for {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
449                             context.getId(), leaderLastIndex, leaderSnapShotIndex);
450
451                     if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
452                         LOG.debug("{}: sendAppendEntries: {} is present for {}", context.getId(),
453                                 followerNextIndex, followerId);
454
455                         // FIXME : Sending one entry at a time
456                         entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
457
458                     } else if (isFollowerActive && followerNextIndex >= 0 &&
459                         leaderLastIndex >= followerNextIndex ) {
460                         // if the followers next index is not present in the leaders log, and
461                         // if the follower is just not starting and if leader's index is more than followers index
462                         // then snapshot should be sent
463
464                         if(LOG.isDebugEnabled()) {
465                             LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
466                                     "follower-nextIndex: %s, leader-snapshot-index: %s,  " +
467                                     "leader-last-index: %s", context.getId(), followerId,
468                                 followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
469                         }
470                         actor().tell(new InitiateInstallSnapshot(), actor());
471
472                         // we would want to sent AE as the capture snapshot might take time
473                         entries =  Collections.<ReplicatedLogEntry>emptyList();
474
475                     } else {
476                         //we send an AppendEntries, even if the follower is inactive
477                         // in-order to update the followers timestamp, in case it becomes active again
478                         entries =  Collections.<ReplicatedLogEntry>emptyList();
479                     }
480
481                     sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
482                 }
483             }
484         }
485     }
486
487     private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
488         List<ReplicatedLogEntry> entries, String followerId) {
489         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
490             prevLogIndex(followerNextIndex),
491             prevLogTerm(followerNextIndex), entries,
492             context.getCommitIndex(), replicatedToAllIndex);
493
494         if(!entries.isEmpty()) {
495             LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId,
496                     appendEntries);
497         }
498
499         followerActor.tell(appendEntries.toSerializable(), actor());
500     }
501
502     /**
503      * An installSnapshot is scheduled at a interval that is a multiple of
504      * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
505      * snapshots at every heartbeat.
506      *
507      * Install Snapshot works as follows
508      * 1. Leader sends a InitiateInstallSnapshot message to self
509      * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor
510      * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
511      * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
512      * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
513      * 5. On complete, Follower sends back a InstallSnapshotReply.
514      * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
515      * and replenishes the memory by deleting the snapshot in Replicated log.
516      *
517      */
518     private void installSnapshotIfNeeded() {
519         if(LOG.isDebugEnabled()) {
520             LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet());
521         }
522
523         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
524             final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
525
526             if (followerActor != null) {
527                 long nextIndex = e.getValue().getNextIndex();
528
529                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
530                         context.getReplicatedLog().isInSnapshot(nextIndex)) {
531                     LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
532                     if (snapshot.isPresent()) {
533                         // if a snapshot is present in the memory, most likely another install is in progress
534                         // no need to capture snapshot
535                         sendSnapshotChunk(followerActor, e.getKey());
536
537                     } else if (!context.isSnapshotCaptureInitiated()) {
538                         initiateCaptureSnapshot();
539                         //we just need 1 follower who would need snapshot to be installed.
540                         // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
541                         // who needs an install and send to all who need
542                         break;
543                     }
544
545                 }
546             }
547         }
548     }
549
550     // on every install snapshot, we try to capture the snapshot.
551     // Once a capture is going on, another one issued will get ignored by RaftActor.
552     private void initiateCaptureSnapshot() {
553         LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId());
554         ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
555         long lastAppliedIndex = -1;
556         long lastAppliedTerm = -1;
557
558         if (lastAppliedEntry != null) {
559             lastAppliedIndex = lastAppliedEntry.getIndex();
560             lastAppliedTerm = lastAppliedEntry.getTerm();
561         } else if (context.getReplicatedLog().getSnapshotIndex() > -1)  {
562             lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
563             lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
564         }
565
566         boolean isInstallSnapshotInitiated = true;
567         actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
568                 lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
569             actor());
570         context.setSnapshotCaptureInitiated(true);
571     }
572
573
574     private void sendInstallSnapshot() {
575         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
576             ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
577
578             if (followerActor != null) {
579                 long nextIndex = e.getValue().getNextIndex();
580
581                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
582                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
583                     sendSnapshotChunk(followerActor, e.getKey());
584                 }
585             }
586         }
587     }
588
589     /**
590      *  Sends a snapshot chunk to a given follower
591      *  InstallSnapshot should qualify as a heartbeat too.
592      */
593     private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
594         try {
595             if (snapshot.isPresent()) {
596                 ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get());
597
598                 // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
599                 // followerId to the followerToSnapshot map.
600                 FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
601
602                 followerActor.tell(
603                     new InstallSnapshot(currentTerm(), context.getId(),
604                         context.getReplicatedLog().getSnapshotIndex(),
605                         context.getReplicatedLog().getSnapshotTerm(),
606                         nextSnapshotChunk,
607                             followerToSnapshot.incrementChunkIndex(),
608                             followerToSnapshot.getTotalChunks(),
609                         Optional.of(followerToSnapshot.getLastChunkHashCode())
610                     ).toSerializable(),
611                     actor()
612                 );
613                 LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
614                         context.getId(), followerActor.path(),
615                         followerToSnapshot.getChunkIndex(),
616                         followerToSnapshot.getTotalChunks());
617             }
618         } catch (IOException e) {
619             LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());
620         }
621     }
622
623     /**
624      * Acccepts snaphot as ByteString, enters into map for future chunks
625      * creates and return a ByteString chunk
626      */
627     private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
628         FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
629         if (followerToSnapshot == null) {
630             followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
631             mapFollowerToSnapshot.put(followerId, followerToSnapshot);
632         }
633         ByteString nextChunk = followerToSnapshot.getNextChunk();
634         if (LOG.isDebugEnabled()) {
635             LOG.debug("{}: Leader's snapshot nextChunk size:{}", context.getId(), nextChunk.size());
636         }
637         return nextChunk;
638     }
639
640     private void sendHeartBeat() {
641         if (!followerToLog.isEmpty()) {
642             sendAppendEntries();
643         }
644     }
645
646     private void stopHeartBeat() {
647         if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
648             heartbeatSchedule.cancel();
649         }
650     }
651
652     private void scheduleHeartBeat(FiniteDuration interval) {
653         if (followerToLog.isEmpty()) {
654             // Optimization - do not bother scheduling a heartbeat as there are
655             // no followers
656             return;
657         }
658
659         stopHeartBeat();
660
661         // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
662         // message is sent to itself.
663         // Scheduling the heartbeat only once here because heartbeats do not
664         // need to be sent if there are other messages being sent to the remote
665         // actor.
666         heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
667             interval, context.getActor(), new SendHeartBeat(),
668             context.getActorSystem().dispatcher(), context.getActor());
669     }
670
671     @Override
672     public void close() throws Exception {
673         stopHeartBeat();
674     }
675
676     @Override
677     public String getLeaderId() {
678         return context.getId();
679     }
680
681     protected boolean isLeaderIsolated() {
682         int minPresent = minIsolatedLeaderPeerCount;
683         for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
684             if (followerLogInformation.isFollowerActive()) {
685                 --minPresent;
686                 if (minPresent == 0) {
687                     break;
688                 }
689             }
690         }
691         return (minPresent != 0);
692     }
693
694     /**
695      * Encapsulates the snapshot bytestring and handles the logic of sending
696      * snapshot chunks
697      */
698     protected class FollowerToSnapshot {
699         private final ByteString snapshotBytes;
700         private int offset = 0;
701         // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
702         private int replyReceivedForOffset;
703         // if replyStatus is false, the previous chunk is attempted
704         private boolean replyStatus = false;
705         private int chunkIndex;
706         private final int totalChunks;
707         private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
708         private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
709
710         public FollowerToSnapshot(ByteString snapshotBytes) {
711             this.snapshotBytes = snapshotBytes;
712             int size = snapshotBytes.size();
713             totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
714                 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
715             if(LOG.isDebugEnabled()) {
716                 LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
717                         context.getId(), size, totalChunks);
718             }
719             replyReceivedForOffset = -1;
720             chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
721         }
722
723         public ByteString getSnapshotBytes() {
724             return snapshotBytes;
725         }
726
727         public int incrementOffset() {
728             if(replyStatus) {
729                 // if prev chunk failed, we would want to sent the same chunk again
730                 offset = offset + context.getConfigParams().getSnapshotChunkSize();
731             }
732             return offset;
733         }
734
735         public int incrementChunkIndex() {
736             if (replyStatus) {
737                 // if prev chunk failed, we would want to sent the same chunk again
738                 chunkIndex =  chunkIndex + 1;
739             }
740             return chunkIndex;
741         }
742
743         public int getChunkIndex() {
744             return chunkIndex;
745         }
746
747         public int getTotalChunks() {
748             return totalChunks;
749         }
750
751         public boolean canSendNextChunk() {
752             // we only send a false if a chunk is sent but we have not received a reply yet
753             return replyReceivedForOffset == offset;
754         }
755
756         public boolean isLastChunk(int chunkIndex) {
757             return totalChunks == chunkIndex;
758         }
759
760         public void markSendStatus(boolean success) {
761             if (success) {
762                 // if the chunk sent was successful
763                 replyReceivedForOffset = offset;
764                 replyStatus = true;
765                 lastChunkHashCode = nextChunkHashCode;
766             } else {
767                 // if the chunk sent was failure
768                 replyReceivedForOffset = offset;
769                 replyStatus = false;
770             }
771         }
772
773         public ByteString getNextChunk() {
774             int snapshotLength = getSnapshotBytes().size();
775             int start = incrementOffset();
776             int size = context.getConfigParams().getSnapshotChunkSize();
777             if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
778                 size = snapshotLength;
779             } else {
780                 if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
781                     size = snapshotLength - start;
782                 }
783             }
784
785             if(LOG.isDebugEnabled()) {
786                 LOG.debug("{}: Next chunk: length={}, offset={},size={}", context.getId(),
787                     snapshotLength, start, size);
788             }
789             ByteString substring = getSnapshotBytes().substring(start, start + size);
790             nextChunkHashCode = substring.hashCode();
791             return substring;
792         }
793
794         /**
795          * reset should be called when the Follower needs to be sent the snapshot from the beginning
796          */
797         public void reset(){
798             offset = 0;
799             replyStatus = false;
800             replyReceivedForOffset = offset;
801             chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
802             lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
803         }
804
805         public int getLastChunkHashCode() {
806             return lastChunkHashCode;
807         }
808     }
809
810     // called from example-actor for printing the follower-states
811     public String printFollowerStates() {
812         final StringBuilder sb = new StringBuilder();
813
814         sb.append('[');
815         for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
816             sb.append('{');
817             sb.append(followerLogInformation.getId());
818             sb.append(" state:");
819             sb.append(followerLogInformation.isFollowerActive());
820             sb.append("},");
821         }
822         sb.append(']');
823
824         return sb.toString();
825     }
826
827     @VisibleForTesting
828     public FollowerLogInformation getFollower(String followerId) {
829         return followerToLog.get(followerId);
830     }
831
832     @VisibleForTesting
833     protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) {
834         mapFollowerToSnapshot.put(followerId, snapshot);
835     }
836
837     @VisibleForTesting
838     public int followerSnapshotSize() {
839         return mapFollowerToSnapshot.size();
840     }
841
842     @VisibleForTesting
843     public int followerLogSize() {
844         return followerToLog.size();
845     }
846 }