Raft behavior logging
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.collect.ImmutableMap;
18 import com.google.common.collect.ImmutableMap.Builder;
19 import com.google.protobuf.ByteString;
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.Iterator;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
30 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
31 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
32 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
33 import org.opendaylight.controller.cluster.raft.RaftActorContext;
34 import org.opendaylight.controller.cluster.raft.RaftState;
35 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
36 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
37 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
39 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
43 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
44 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
45 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
46 import scala.concurrent.duration.FiniteDuration;
47
48 /**
49  * The behavior of a RaftActor when it is in the Leader state
50  * <p/>
51  * Leaders:
52  * <ul>
53  * <li> Upon election: send initial empty AppendEntries RPCs
54  * (heartbeat) to each server; repeat during idle periods to
55  * prevent election timeouts (§5.2)
56  * <li> If command received from client: append entry to local log,
57  * respond after entry applied to state machine (§5.3)
58  * <li> If last log index ≥ nextIndex for a follower: send
59  * AppendEntries RPC with log entries starting at nextIndex
60  * <ul>
61  * <li> If successful: update nextIndex and matchIndex for
62  * follower (§5.3)
63  * <li> If AppendEntries fails because of log inconsistency:
64  * decrement nextIndex and retry (§5.3)
65  * </ul>
66  * <li> If there exists an N such that N > commitIndex, a majority
67  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
68  * set commitIndex = N (§5.3, §5.4).
69  */
70 public abstract class AbstractLeader extends AbstractRaftActorBehavior {
71
72     // The index of the first chunk that is sent when installing a snapshot
73     public static final int FIRST_CHUNK_INDEX = 1;
74
75     // The index that the follower should respond with if it needs the install snapshot to be reset
76     public static final int INVALID_CHUNK_INDEX = -1;
77
78     // This would be passed as the hash code of the last chunk when sending the first chunk
79     public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
80
81     private final Map<String, FollowerLogInformation> followerToLog;
82     private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
83
84     private Cancellable heartbeatSchedule = null;
85
86     private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
87
88     protected final int minReplicationCount;
89
90     protected final int minIsolatedLeaderPeerCount;
91
92     private Optional<ByteString> snapshot;
93
94     public AbstractLeader(RaftActorContext context) {
95         super(context, RaftState.Leader);
96
97         final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
98         for (String followerId : context.getPeerAddresses().keySet()) {
99             FollowerLogInformation followerLogInformation =
100                 new FollowerLogInformationImpl(followerId,
101                     context.getCommitIndex(), -1,
102                     context.getConfigParams().getElectionTimeOutInterval());
103
104             ftlBuilder.put(followerId, followerLogInformation);
105         }
106         followerToLog = ftlBuilder.build();
107
108         leaderId = context.getId();
109
110         LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
111
112         minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
113
114         // the isolated Leader peer count will be 1 less than the majority vote count.
115         // this is because the vote count has the self vote counted in it
116         // for e.g
117         // 0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0
118         // 2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1
119         // 4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2
120         minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
121
122         snapshot = Optional.absent();
123
124         // Immediately schedule a heartbeat
125         // Upon election: send initial empty AppendEntries RPCs
126         // (heartbeat) to each server; repeat during idle periods to
127         // prevent election timeouts (§5.2)
128         sendAppendEntries(0, false);
129     }
130
131     /**
132      * Return an immutable collection of follower identifiers.
133      *
134      * @return Collection of follower IDs
135      */
136     protected final Collection<String> getFollowerIds() {
137         return followerToLog.keySet();
138     }
139
140     @VisibleForTesting
141     void setSnapshot(Optional<ByteString> snapshot) {
142         this.snapshot = snapshot;
143     }
144
145     @Override
146     protected RaftActorBehavior handleAppendEntries(ActorRef sender,
147         AppendEntries appendEntries) {
148
149         LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
150
151         return this;
152     }
153
154     @Override
155     protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
156         AppendEntriesReply appendEntriesReply) {
157
158         if(LOG.isTraceEnabled()) {
159             LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
160         } else if(LOG.isDebugEnabled() && !appendEntriesReply.isSuccess()) {
161             LOG.debug("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
162         }
163
164         // Update the FollowerLogInformation
165         String followerId = appendEntriesReply.getFollowerId();
166         FollowerLogInformation followerLogInformation =
167             followerToLog.get(followerId);
168
169         if(followerLogInformation == null){
170             LOG.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId);
171             return this;
172         }
173
174         followerLogInformation.markFollowerActive();
175
176         if (appendEntriesReply.isSuccess()) {
177             followerLogInformation
178                 .setMatchIndex(appendEntriesReply.getLogLastIndex());
179             followerLogInformation
180                 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
181         } else {
182
183             // TODO: When we find that the follower is out of sync with the
184             // Leader we simply decrement that followers next index by 1.
185             // Would it be possible to do better than this? The RAFT spec
186             // does not explicitly deal with it but may be something for us to
187             // think about
188
189             followerLogInformation.decrNextIndex();
190         }
191
192         // Now figure out if this reply warrants a change in the commitIndex
193         // If there exists an N such that N > commitIndex, a majority
194         // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
195         // set commitIndex = N (§5.3, §5.4).
196         for (long N = context.getCommitIndex() + 1; ; N++) {
197             int replicatedCount = 1;
198
199             for (FollowerLogInformation info : followerToLog.values()) {
200                 if (info.getMatchIndex() >= N) {
201                     replicatedCount++;
202                 }
203             }
204
205             if (replicatedCount >= minReplicationCount) {
206                 ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
207                 if (replicatedLogEntry != null &&
208                     replicatedLogEntry.getTerm() == currentTerm()) {
209                     context.setCommitIndex(N);
210                 }
211             } else {
212                 break;
213             }
214         }
215
216         // Apply the change to the state machine
217         if (context.getCommitIndex() > context.getLastApplied()) {
218             LOG.debug("{}: handleAppendEntriesReply: applying to log - commitIndex: {}, lastAppliedIndex: {}",
219                     logName(), context.getCommitIndex(), context.getLastApplied());
220
221             applyLogToStateMachine(context.getCommitIndex());
222         }
223
224         if (!context.isSnapshotCaptureInitiated()) {
225             purgeInMemoryLog();
226         }
227
228         //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
229         sendUpdatesToFollower(followerId, followerLogInformation, false, false);
230         return this;
231     }
232
233     private void purgeInMemoryLog() {
234         //find the lowest index across followers which has been replicated to all.
235         // lastApplied if there are no followers, so that we keep clearing the log for single-node
236         // we would delete the in-mem log from that index on, in-order to minimize mem usage
237         // we would also share this info thru AE with the followers so that they can delete their log entries as well.
238         long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE;
239         for (FollowerLogInformation info : followerToLog.values()) {
240             minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
241         }
242
243         super.performSnapshotWithoutCapture(minReplicatedToAllIndex);
244     }
245
246     @Override
247     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
248         final Iterator<ClientRequestTracker> it = trackerList.iterator();
249         while (it.hasNext()) {
250             final ClientRequestTracker t = it.next();
251             if (t.getIndex() == logIndex) {
252                 it.remove();
253                 return t;
254             }
255         }
256
257         return null;
258     }
259
260     @Override
261     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
262         for (ClientRequestTracker tracker : trackerList) {
263             if (tracker.getIndex() == logIndex) {
264                 return tracker;
265             }
266         }
267         return null;
268     }
269
270     @Override
271     protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
272         RequestVoteReply requestVoteReply) {
273         return this;
274     }
275
276     @Override
277     public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
278         Preconditions.checkNotNull(sender, "sender should not be null");
279
280         Object message = fromSerializableMessage(originalMessage);
281
282         if (message instanceof RaftRPC) {
283             RaftRPC rpc = (RaftRPC) message;
284             // If RPC request or response contains term T > currentTerm:
285             // set currentTerm = T, convert to follower (§5.1)
286             // This applies to all RPC messages and responses
287             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
288                 LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
289                         logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
290
291                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
292
293                 return switchBehavior(new Follower(context));
294             }
295         }
296
297         try {
298             if (message instanceof SendHeartBeat) {
299                 sendHeartBeat();
300                 return this;
301
302             } else if(message instanceof SendInstallSnapshot) {
303                 // received from RaftActor
304                 setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
305                 sendInstallSnapshot();
306
307             } else if (message instanceof Replicate) {
308                 replicate((Replicate) message);
309
310             } else if (message instanceof InstallSnapshotReply){
311                 handleInstallSnapshotReply((InstallSnapshotReply) message);
312
313             }
314         } finally {
315             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
316         }
317
318         return super.handleMessage(sender, message);
319     }
320
321     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
322         LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
323
324         String followerId = reply.getFollowerId();
325         FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
326
327         if (followerToSnapshot == null) {
328             LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
329                     logName(), followerId);
330             return;
331         }
332
333         FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
334         followerLogInformation.markFollowerActive();
335
336         if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
337             boolean wasLastChunk = false;
338             if (reply.isSuccess()) {
339                 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
340                     //this was the last chunk reply
341                     if(LOG.isDebugEnabled()) {
342                         LOG.debug("{}: InstallSnapshotReply received, " +
343                                 "last chunk received, Chunk: {}. Follower: {} Setting nextIndex: {}",
344                                 logName(), reply.getChunkIndex(), followerId,
345                             context.getReplicatedLog().getSnapshotIndex() + 1
346                         );
347                     }
348
349                     followerLogInformation.setMatchIndex(
350                         context.getReplicatedLog().getSnapshotIndex());
351                     followerLogInformation.setNextIndex(
352                         context.getReplicatedLog().getSnapshotIndex() + 1);
353                     mapFollowerToSnapshot.remove(followerId);
354
355                     LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
356                                 logName(), followerId, followerLogInformation.getMatchIndex(),
357                                 followerLogInformation.getNextIndex());
358
359                     if (mapFollowerToSnapshot.isEmpty()) {
360                         // once there are no pending followers receiving snapshots
361                         // we can remove snapshot from the memory
362                         setSnapshot(Optional.<ByteString>absent());
363                     }
364                     wasLastChunk = true;
365
366                 } else {
367                     followerToSnapshot.markSendStatus(true);
368                 }
369             } else {
370                 LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
371                         logName(), reply.getChunkIndex());
372
373                 followerToSnapshot.markSendStatus(false);
374             }
375
376             if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
377                 ActorSelection followerActor = context.getPeerActorSelection(followerId);
378                 if(followerActor != null) {
379                     sendSnapshotChunk(followerActor, followerId);
380                 }
381             }
382
383         } else {
384             LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
385                     logName(), reply.getChunkIndex(), followerId,
386                     followerToSnapshot.getChunkIndex());
387
388             if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
389                 // Since the Follower did not find this index to be valid we should reset the follower snapshot
390                 // so that Installing the snapshot can resume from the beginning
391                 followerToSnapshot.reset();
392             }
393         }
394     }
395
396     private void replicate(Replicate replicate) {
397         long logIndex = replicate.getReplicatedLogEntry().getIndex();
398
399         LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}", logName(),
400                 replicate.getIdentifier(), logIndex);
401
402         // Create a tracker entry we will use this later to notify the
403         // client actor
404         trackerList.add(
405             new ClientRequestTrackerImpl(replicate.getClientActor(),
406                 replicate.getIdentifier(),
407                 logIndex)
408         );
409
410         if (followerToLog.isEmpty()) {
411             context.setCommitIndex(logIndex);
412             applyLogToStateMachine(logIndex);
413         } else {
414             sendAppendEntries(0, false);
415         }
416     }
417
418     private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
419         // Send an AppendEntries to all followers
420         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
421             final String followerId = e.getKey();
422             final FollowerLogInformation followerLogInformation = e.getValue();
423             // This checks helps not to send a repeat message to the follower
424             if(!followerLogInformation.isFollowerActive() ||
425                     followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) {
426                 sendUpdatesToFollower(followerId, followerLogInformation, true, isHeartbeat);
427             }
428         }
429     }
430
431     /**
432      *
433      * This method checks if any update needs to be sent to the given follower. This includes append log entries,
434      * sending next snapshot chunk, and initiating a snapshot.
435      * @return true if any update is sent, false otherwise
436      */
437
438     private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
439                                        boolean sendHeartbeat, boolean isHeartbeat) {
440
441         ActorSelection followerActor = context.getPeerActorSelection(followerId);
442         if (followerActor != null) {
443             long followerNextIndex = followerLogInformation.getNextIndex();
444             boolean isFollowerActive = followerLogInformation.isFollowerActive();
445
446             if (mapFollowerToSnapshot.get(followerId) != null) {
447                 // if install snapshot is in process , then sent next chunk if possible
448                 if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
449                     sendSnapshotChunk(followerActor, followerId);
450                 } else if(sendHeartbeat) {
451                     // we send a heartbeat even if we have not received a reply for the last chunk
452                     sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
453                         Collections.<ReplicatedLogEntry>emptyList(), followerId);
454                 }
455             } else {
456                 long leaderLastIndex = context.getReplicatedLog().lastIndex();
457                 long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
458
459                 if(!isHeartbeat || LOG.isTraceEnabled()) {
460                     LOG.debug("{}: Checking sendAppendEntries for follower {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
461                             logName(), followerId, leaderLastIndex, leaderSnapShotIndex);
462                 }
463
464                 if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
465
466                     LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
467                             followerNextIndex, followerId);
468
469                     // FIXME : Sending one entry at a time
470                     final List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
471
472                     sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
473
474                 } else if (isFollowerActive && followerNextIndex >= 0 &&
475                     leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
476                     // if the followers next index is not present in the leaders log, and
477                     // if the follower is just not starting and if leader's index is more than followers index
478                     // then snapshot should be sent
479
480                     if (LOG.isDebugEnabled()) {
481                         LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
482                                     "follower-nextIndex: %d, leader-snapshot-index: %d,  " +
483                                     "leader-last-index: %d", logName(), followerId,
484                                     followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
485                     }
486
487                     // Send heartbeat to follower whenever install snapshot is initiated.
488                     sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
489                             Collections.<ReplicatedLogEntry>emptyList(), followerId);
490
491                     initiateCaptureSnapshot(followerId, followerNextIndex);
492
493                 } else if(sendHeartbeat) {
494                     //we send an AppendEntries, even if the follower is inactive
495                     // in-order to update the followers timestamp, in case it becomes active again
496                     sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
497                         Collections.<ReplicatedLogEntry>emptyList(), followerId);
498                 }
499
500             }
501         }
502     }
503
504     private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
505         List<ReplicatedLogEntry> entries, String followerId) {
506         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
507             prevLogIndex(followerNextIndex),
508             prevLogTerm(followerNextIndex), entries,
509             context.getCommitIndex(), super.getReplicatedToAllIndex());
510
511         if(!entries.isEmpty() || LOG.isTraceEnabled()) {
512             LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId,
513                     appendEntries);
514         }
515
516         followerActor.tell(appendEntries.toSerializable(), actor());
517     }
518
519     /**
520      * Install Snapshot works as follows
521      * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
522      * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
523      * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
524      * 3. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
525      * 4. On complete, Follower sends back a InstallSnapshotReply.
526      * 5. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
527      * and replenishes the memory by deleting the snapshot in Replicated log.
528      * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply)
529      * then send the existing snapshot in chunks to the follower.
530      * @param followerId
531      * @param followerNextIndex
532      */
533     private void initiateCaptureSnapshot(String followerId, long followerNextIndex) {
534         if (!context.getReplicatedLog().isPresent(followerNextIndex) &&
535                 context.getReplicatedLog().isInSnapshot(followerNextIndex)) {
536
537             if (snapshot.isPresent()) {
538                 // if a snapshot is present in the memory, most likely another install is in progress
539                 // no need to capture snapshot.
540                 // This could happen if another follower needs an install when one is going on.
541                 final ActorSelection followerActor = context.getPeerActorSelection(followerId);
542                 sendSnapshotChunk(followerActor, followerId);
543
544             } else if (!context.isSnapshotCaptureInitiated()) {
545
546                 LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", logName(), getLeaderId());
547                 ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
548                 long lastAppliedIndex = -1;
549                 long lastAppliedTerm = -1;
550
551                 if (lastAppliedEntry != null) {
552                     lastAppliedIndex = lastAppliedEntry.getIndex();
553                     lastAppliedTerm = lastAppliedEntry.getTerm();
554                 } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
555                     lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
556                     lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
557                 }
558
559                 boolean isInstallSnapshotInitiated = true;
560                 long replicatedToAllIndex = super.getReplicatedToAllIndex();
561                 ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
562                 actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
563                     (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
564                     (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
565                     isInstallSnapshotInitiated), actor());
566                 context.setSnapshotCaptureInitiated(true);
567             }
568         }
569     }
570
571
572     private void sendInstallSnapshot() {
573         LOG.debug("{}: sendInstallSnapshot", logName());
574         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
575             ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
576
577             if (followerActor != null) {
578                 long nextIndex = e.getValue().getNextIndex();
579
580                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
581                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
582                     sendSnapshotChunk(followerActor, e.getKey());
583                 }
584             }
585         }
586     }
587
588     /**
589      *  Sends a snapshot chunk to a given follower
590      *  InstallSnapshot should qualify as a heartbeat too.
591      */
592     private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
593         try {
594             if (snapshot.isPresent()) {
595                 ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get());
596
597                 // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
598                 // followerId to the followerToSnapshot map.
599                 FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
600
601                 followerActor.tell(
602                     new InstallSnapshot(currentTerm(), context.getId(),
603                         context.getReplicatedLog().getSnapshotIndex(),
604                         context.getReplicatedLog().getSnapshotTerm(),
605                         nextSnapshotChunk,
606                         followerToSnapshot.incrementChunkIndex(),
607                         followerToSnapshot.getTotalChunks(),
608                         Optional.of(followerToSnapshot.getLastChunkHashCode())
609                     ).toSerializable(),
610                     actor()
611                 );
612                 LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
613                         logName(), followerActor.path(),
614                         followerToSnapshot.getChunkIndex(),
615                         followerToSnapshot.getTotalChunks());
616             }
617         } catch (IOException e) {
618             LOG.error("{}: InstallSnapshot failed for Leader.", logName(), e);
619         }
620     }
621
622     /**
623      * Acccepts snaphot as ByteString, enters into map for future chunks
624      * creates and return a ByteString chunk
625      */
626     private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
627         FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
628         if (followerToSnapshot == null) {
629             followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
630             mapFollowerToSnapshot.put(followerId, followerToSnapshot);
631         }
632         ByteString nextChunk = followerToSnapshot.getNextChunk();
633
634         LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size());
635
636         return nextChunk;
637     }
638
639     private void sendHeartBeat() {
640         if (!followerToLog.isEmpty()) {
641             LOG.trace("{}: Sending heartbeat", logName());
642             sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true);
643         }
644     }
645
646     private void stopHeartBeat() {
647         if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
648             heartbeatSchedule.cancel();
649         }
650     }
651
652     private void scheduleHeartBeat(FiniteDuration interval) {
653         if (followerToLog.isEmpty()) {
654             // Optimization - do not bother scheduling a heartbeat as there are
655             // no followers
656             return;
657         }
658
659         stopHeartBeat();
660
661         // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
662         // message is sent to itself.
663         // Scheduling the heartbeat only once here because heartbeats do not
664         // need to be sent if there are other messages being sent to the remote
665         // actor.
666         heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
667             interval, context.getActor(), new SendHeartBeat(),
668             context.getActorSystem().dispatcher(), context.getActor());
669     }
670
671     @Override
672     public void close() throws Exception {
673         stopHeartBeat();
674     }
675
676     @Override
677     public String getLeaderId() {
678         return context.getId();
679     }
680
681     protected boolean isLeaderIsolated() {
682         int minPresent = minIsolatedLeaderPeerCount;
683         for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
684             if (followerLogInformation.isFollowerActive()) {
685                 --minPresent;
686                 if (minPresent == 0) {
687                     break;
688                 }
689             }
690         }
691         return (minPresent != 0);
692     }
693
694     /**
695      * Encapsulates the snapshot bytestring and handles the logic of sending
696      * snapshot chunks
697      */
698     protected class FollowerToSnapshot {
699         private final ByteString snapshotBytes;
700         private int offset = 0;
701         // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
702         private int replyReceivedForOffset;
703         // if replyStatus is false, the previous chunk is attempted
704         private boolean replyStatus = false;
705         private int chunkIndex;
706         private final int totalChunks;
707         private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
708         private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
709
710         public FollowerToSnapshot(ByteString snapshotBytes) {
711             this.snapshotBytes = snapshotBytes;
712             int size = snapshotBytes.size();
713             totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
714                 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
715             if(LOG.isDebugEnabled()) {
716                 LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
717                         logName(), size, totalChunks);
718             }
719             replyReceivedForOffset = -1;
720             chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
721         }
722
723         public ByteString getSnapshotBytes() {
724             return snapshotBytes;
725         }
726
727         public int incrementOffset() {
728             if(replyStatus) {
729                 // if prev chunk failed, we would want to sent the same chunk again
730                 offset = offset + context.getConfigParams().getSnapshotChunkSize();
731             }
732             return offset;
733         }
734
735         public int incrementChunkIndex() {
736             if (replyStatus) {
737                 // if prev chunk failed, we would want to sent the same chunk again
738                 chunkIndex =  chunkIndex + 1;
739             }
740             return chunkIndex;
741         }
742
743         public int getChunkIndex() {
744             return chunkIndex;
745         }
746
747         public int getTotalChunks() {
748             return totalChunks;
749         }
750
751         public boolean canSendNextChunk() {
752             // we only send a false if a chunk is sent but we have not received a reply yet
753             return replyReceivedForOffset == offset;
754         }
755
756         public boolean isLastChunk(int chunkIndex) {
757             return totalChunks == chunkIndex;
758         }
759
760         public void markSendStatus(boolean success) {
761             if (success) {
762                 // if the chunk sent was successful
763                 replyReceivedForOffset = offset;
764                 replyStatus = true;
765                 lastChunkHashCode = nextChunkHashCode;
766             } else {
767                 // if the chunk sent was failure
768                 replyReceivedForOffset = offset;
769                 replyStatus = false;
770             }
771         }
772
773         public ByteString getNextChunk() {
774             int snapshotLength = getSnapshotBytes().size();
775             int start = incrementOffset();
776             int size = context.getConfigParams().getSnapshotChunkSize();
777             if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
778                 size = snapshotLength;
779             } else {
780                 if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
781                     size = snapshotLength - start;
782                 }
783             }
784
785
786             LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(),
787                     snapshotLength, start, size);
788
789             ByteString substring = getSnapshotBytes().substring(start, start + size);
790             nextChunkHashCode = substring.hashCode();
791             return substring;
792         }
793
794         /**
795          * reset should be called when the Follower needs to be sent the snapshot from the beginning
796          */
797         public void reset(){
798             offset = 0;
799             replyStatus = false;
800             replyReceivedForOffset = offset;
801             chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
802             lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
803         }
804
805         public int getLastChunkHashCode() {
806             return lastChunkHashCode;
807         }
808     }
809
810     // called from example-actor for printing the follower-states
811     public String printFollowerStates() {
812         final StringBuilder sb = new StringBuilder();
813
814         sb.append('[');
815         for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
816             sb.append('{');
817             sb.append(followerLogInformation.getId());
818             sb.append(" state:");
819             sb.append(followerLogInformation.isFollowerActive());
820             sb.append("},");
821         }
822         sb.append(']');
823
824         return sb.toString();
825     }
826
827     @VisibleForTesting
828     public FollowerLogInformation getFollower(String followerId) {
829         return followerToLog.get(followerId);
830     }
831
832     @VisibleForTesting
833     protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) {
834         mapFollowerToSnapshot.put(followerId, snapshot);
835     }
836
837     @VisibleForTesting
838     public int followerSnapshotSize() {
839         return mapFollowerToSnapshot.size();
840     }
841
842     @VisibleForTesting
843     public int followerLogSize() {
844         return followerToLog.size();
845     }
846 }