31bf99c2dc2bc9f5564c256294c605f6af1bd482
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.protobuf.ByteString;
18 import java.util.Collection;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.Iterator;
22 import java.util.LinkedList;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Map.Entry;
26 import java.util.Queue;
27 import javax.annotation.Nullable;
28 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
29 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
30 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
31 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
32 import org.opendaylight.controller.cluster.raft.PeerInfo;
33 import org.opendaylight.controller.cluster.raft.RaftActorContext;
34 import org.opendaylight.controller.cluster.raft.RaftState;
35 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
36 import org.opendaylight.controller.cluster.raft.Snapshot;
37 import org.opendaylight.controller.cluster.raft.VotingState;
38 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
39 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
40 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
42 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
43 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
44 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
45 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
46 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
47 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
48 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
49 import scala.concurrent.duration.FiniteDuration;
50
51 /**
52  * The behavior of a RaftActor when it is in the Leader state
53  *
54  * <p>
55  * Leaders:
56  * <ul>
57  * <li> Upon election: send initial empty AppendEntries RPCs
58  * (heartbeat) to each server; repeat during idle periods to
59  * prevent election timeouts (§5.2)
60  * <li> If command received from client: append entry to local log,
61  * respond after entry applied to state machine (§5.3)
62  * <li> If last log index ≥ nextIndex for a follower: send
63  * AppendEntries RPC with log entries starting at nextIndex
64  * <li> If successful: update nextIndex and matchIndex for
65  * follower (§5.3)
66  * <li> If AppendEntries fails because of log inconsistency:
67  * decrement nextIndex and retry (§5.3)
68  * <li> If there exists an N such that N &gt; commitIndex, a majority
69  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
70  * set commitIndex = N (§5.3, §5.4).
71  * </ul>
72  */
73 public abstract class AbstractLeader extends AbstractRaftActorBehavior {
74     private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
75
76     /**
77      * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
78      * expect the entries to be modified in sequence, hence we open-code the lookup.
79      * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly,
80      *       but we already expect those to be far from frequent.
81      */
82     private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
83
84     private Cancellable heartbeatSchedule = null;
85     private Optional<SnapshotHolder> snapshot = Optional.absent();
86     private int minReplicationCount;
87
88     protected AbstractLeader(RaftActorContext context, RaftState state,
89             @Nullable AbstractLeader initializeFromLeader) {
90         super(context, state);
91
92         if (initializeFromLeader != null) {
93             followerToLog.putAll(initializeFromLeader.followerToLog);
94             snapshot = initializeFromLeader.snapshot;
95             trackers.addAll(initializeFromLeader.trackers);
96         } else {
97             for (PeerInfo peerInfo: context.getPeers()) {
98                 FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
99                 followerToLog.put(peerInfo.getId(), followerLogInformation);
100             }
101         }
102
103         log.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
104
105         updateMinReplicaCount();
106
107         // Immediately schedule a heartbeat
108         // Upon election: send initial empty AppendEntries RPCs
109         // (heartbeat) to each server; repeat during idle periods to
110         // prevent election timeouts (§5.2)
111         sendAppendEntries(0, false);
112
113         // It is important to schedule this heartbeat here
114         scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
115     }
116
117     protected AbstractLeader(RaftActorContext context, RaftState state) {
118         this(context, state, null);
119     }
120
121     /**
122      * Return an immutable collection of follower identifiers.
123      *
124      * @return Collection of follower IDs
125      */
126     public final Collection<String> getFollowerIds() {
127         return followerToLog.keySet();
128     }
129
130     public void addFollower(String followerId) {
131         FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(
132                 context.getPeerInfo(followerId), -1, context);
133         followerToLog.put(followerId, followerLogInformation);
134
135         if (heartbeatSchedule == null) {
136             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
137         }
138     }
139
140     public void removeFollower(String followerId) {
141         followerToLog.remove(followerId);
142     }
143
144     public void updateMinReplicaCount() {
145         int numVoting = 0;
146         for (PeerInfo peer: context.getPeers()) {
147             if (peer.isVoting()) {
148                 numVoting++;
149             }
150         }
151
152         minReplicationCount = getMajorityVoteCount(numVoting);
153     }
154
155     protected int getMinIsolatedLeaderPeerCount() {
156       //the isolated Leader peer count will be 1 less than the majority vote count.
157         //this is because the vote count has the self vote counted in it
158         //for e.g
159         //0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0
160         //2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1
161         //4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2
162
163         return minReplicationCount > 0 ? minReplicationCount - 1 : 0;
164     }
165
166     @VisibleForTesting
167     void setSnapshot(@Nullable Snapshot snapshot) {
168         if (snapshot != null) {
169             this.snapshot = Optional.of(new SnapshotHolder(snapshot));
170         } else {
171             this.snapshot = Optional.absent();
172         }
173     }
174
175     @VisibleForTesting
176     boolean hasSnapshot() {
177         return snapshot.isPresent();
178     }
179
180     @Override
181     protected RaftActorBehavior handleAppendEntries(ActorRef sender,
182         AppendEntries appendEntries) {
183
184         log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
185
186         return this;
187     }
188
189     @Override
190     protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
191         log.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
192
193         // Update the FollowerLogInformation
194         String followerId = appendEntriesReply.getFollowerId();
195         FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
196
197         if (followerLogInformation == null) {
198             log.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId);
199             return this;
200         }
201
202         if (followerLogInformation.timeSinceLastActivity()
203                 > context.getConfigParams().getElectionTimeOutInterval().toMillis()) {
204             log.warn("{} : handleAppendEntriesReply delayed beyond election timeout, "
205                     + "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}",
206                     logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(),
207                     context.getLastApplied(), context.getCommitIndex());
208         }
209
210         followerLogInformation.markFollowerActive();
211         followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
212         followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
213
214         long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
215         long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex);
216         boolean updated = false;
217         if (appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) {
218             // The follower's log is actually ahead of the leader's log. Normally this doesn't happen
219             // in raft as a node cannot become leader if it's log is behind another's. However, the
220             // non-voting semantics deviate a bit from raft. Only voting members participate in
221             // elections and can become leader so it's possible for a non-voting follower to be ahead
222             // of the leader. This can happen if persistence is disabled and all voting members are
223             // restarted. In this case, the voting leader will start out with an empty log however
224             // the non-voting followers still retain the previous data in memory. On the first
225             // AppendEntries, the non-voting follower returns a successful reply b/c the prevLogIndex
226             // sent by the leader is -1 and thus the integrity checks pass. However the follower's returned
227             // lastLogIndex may be higher in which case we want to reset the follower by installing a
228             // snapshot. It's also possible that the follower's last log index is behind the leader's.
229             // However in this case the log terms won't match and the logs will conflict - this is handled
230             // elsewhere.
231             log.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - "
232                     + "forcing install snaphot", logName(), followerLogInformation.getId(),
233                     appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex());
234
235             followerLogInformation.setMatchIndex(-1);
236             followerLogInformation.setNextIndex(-1);
237
238             initiateCaptureSnapshot(followerId);
239
240             updated = true;
241         } else if (appendEntriesReply.isSuccess()) {
242             if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0
243                     && followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) {
244                 // The follower's last entry is present in the leader's journal but the terms don't match so the
245                 // follower has a conflicting entry. Since the follower didn't report that it's out of sync, this means
246                 // either the previous leader entry sent didn't conflict or the previous leader entry is in the snapshot
247                 // and no longer in the journal. Either way, we set the follower's next index to 1 less than the last
248                 // index reported by the follower. For the former case, the leader will send all entries starting with
249                 // the previous follower's index and the follower will remove and replace the conflicting entries as
250                 // needed. For the latter, the leader will initiate an install snapshot.
251
252                 followerLogInformation.setNextIndex(followerLastLogIndex - 1);
253                 updated = true;
254
255                 log.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
256                         + "leader's {} - set the follower's next index to {}", logName(),
257                         followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
258                         followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
259             } else {
260                 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
261             }
262         } else {
263             log.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
264
265             if (appendEntriesReply.isForceInstallSnapshot()) {
266                 // Reset the followers match and next index. This is to signal that this follower has nothing
267                 // in common with this Leader and so would require a snapshot to be installed
268                 followerLogInformation.setMatchIndex(-1);
269                 followerLogInformation.setNextIndex(-1);
270
271                 // Force initiate a snapshot capture
272                 initiateCaptureSnapshot(followerId);
273             } else if (followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0
274                     && followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) {
275                 // The follower's log is empty or the last entry is present in the leader's journal
276                 // and the terms match so the follower is just behind the leader's journal from
277                 // the last snapshot, if any. We'll catch up the follower quickly by starting at the
278                 // follower's last log index.
279
280                 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
281             } else {
282                 // The follower's log conflicts with leader's log so decrement follower's next index by 1
283                 // in an attempt to find where the logs match.
284
285                 followerLogInformation.decrNextIndex();
286                 updated = true;
287
288                 log.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}",
289                         logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog,
290                         followerLogInformation.getNextIndex());
291             }
292         }
293
294         // Now figure out if this reply warrants a change in the commitIndex
295         // If there exists an N such that N > commitIndex, a majority
296         // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
297         // set commitIndex = N (§5.3, §5.4).
298         if (log.isTraceEnabled()) {
299             log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
300                     logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
301         }
302
303         for (long index = context.getCommitIndex() + 1; ; index++) {
304             int replicatedCount = 1;
305
306             log.trace("{}: checking Nth index {}", logName(), index);
307             for (FollowerLogInformation info : followerToLog.values()) {
308                 final PeerInfo peerInfo = context.getPeerInfo(info.getId());
309                 if (info.getMatchIndex() >= index && peerInfo != null && peerInfo.isVoting()) {
310                     replicatedCount++;
311                 } else if (log.isTraceEnabled()) {
312                     log.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
313                             info.getMatchIndex(), peerInfo);
314                 }
315             }
316
317             if (log.isTraceEnabled()) {
318                 log.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount,
319                         minReplicationCount);
320             }
321
322             if (replicatedCount >= minReplicationCount) {
323                 ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
324                 if (replicatedLogEntry == null) {
325                     log.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
326                             logName(), index, context.getReplicatedLog().getSnapshotIndex(),
327                             context.getReplicatedLog().size());
328                     break;
329                 }
330
331                 // Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
332                 // "Raft never commits log entries from previous terms by counting replicas".
333                 // However we keep looping so we can make progress when new entries in the current term
334                 // reach consensus, as per §5.4.1: "once an entry from the current term is committed by
335                 // counting replicas, then all prior entries are committed indirectly".
336                 if (replicatedLogEntry.getTerm() == currentTerm()) {
337                     log.trace("{}: Setting commit index to {}", logName(), index);
338                     context.setCommitIndex(index);
339                 } else {
340                     log.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, "
341                             + "term {} does not match the current term {}", logName(), index,
342                             replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm());
343                 }
344             } else {
345                 log.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount);
346                 break;
347             }
348         }
349
350         // Apply the change to the state machine
351         if (context.getCommitIndex() > context.getLastApplied()) {
352             if (log.isDebugEnabled()) {
353                 log.debug(
354                     "{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}",
355                     logName(), followerId, context.getCommitIndex(), context.getLastApplied());
356             }
357
358             applyLogToStateMachine(context.getCommitIndex());
359         }
360
361         if (!context.getSnapshotManager().isCapturing()) {
362             purgeInMemoryLog();
363         }
364
365         //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
366         sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
367
368         return this;
369     }
370
371     private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation,
372             AppendEntriesReply appendEntriesReply) {
373         boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex());
374         updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated;
375
376         if (updated && log.isDebugEnabled()) {
377             log.debug(
378                 "{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}",
379                 logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(),
380                 followerLogInformation.getNextIndex());
381         }
382         return updated;
383     }
384
385     private void purgeInMemoryLog() {
386         //find the lowest index across followers which has been replicated to all.
387         // lastApplied if there are no followers, so that we keep clearing the log for single-node
388         // we would delete the in-mem log from that index on, in-order to minimize mem usage
389         // we would also share this info thru AE with the followers so that they can delete their log entries as well.
390         long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE;
391         for (FollowerLogInformation info : followerToLog.values()) {
392             minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
393         }
394
395         super.performSnapshotWithoutCapture(minReplicatedToAllIndex);
396     }
397
398     @Override
399     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
400         final Iterator<ClientRequestTracker> it = trackers.iterator();
401         while (it.hasNext()) {
402             final ClientRequestTracker t = it.next();
403             if (t.getIndex() == logIndex) {
404                 it.remove();
405                 return t;
406             }
407         }
408
409         return null;
410     }
411
412     @Override
413     protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
414         RequestVoteReply requestVoteReply) {
415         return this;
416     }
417
418     protected void beforeSendHeartbeat(){}
419
420     @Override
421     public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
422         Preconditions.checkNotNull(sender, "sender should not be null");
423
424         if (message instanceof RaftRPC) {
425             RaftRPC rpc = (RaftRPC) message;
426             // If RPC request or response contains term T > currentTerm:
427             // set currentTerm = T, convert to follower (§5.1)
428             // This applies to all RPC messages and responses
429             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
430                 log.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
431                         logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
432
433                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
434
435                 return internalSwitchBehavior(RaftState.Follower);
436             }
437         }
438
439         if (message instanceof SendHeartBeat) {
440             beforeSendHeartbeat();
441             sendHeartBeat();
442             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
443         } else if (message instanceof SendInstallSnapshot) {
444             // received from RaftActor
445             setSnapshot(((SendInstallSnapshot) message).getSnapshot());
446             sendInstallSnapshot();
447         } else if (message instanceof Replicate) {
448             replicate((Replicate) message);
449         } else if (message instanceof InstallSnapshotReply) {
450             handleInstallSnapshotReply((InstallSnapshotReply) message);
451         } else {
452             return super.handleMessage(sender, message);
453         }
454
455         return this;
456     }
457
458     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
459         log.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
460
461         String followerId = reply.getFollowerId();
462         FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
463         if (followerLogInformation == null) {
464             // This can happen during AddServer if it times out.
465             log.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply",
466                     logName(), followerId);
467             return;
468         }
469
470         LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
471         if (installSnapshotState == null) {
472             log.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply",
473                     logName(), followerId);
474             return;
475         }
476
477         followerLogInformation.markFollowerActive();
478
479         if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) {
480             boolean wasLastChunk = false;
481             if (reply.isSuccess()) {
482                 if (installSnapshotState.isLastChunk(reply.getChunkIndex())) {
483                     //this was the last chunk reply
484                     log.debug("{}: InstallSnapshotReply received, last chunk received, Chunk: {}. Follower: {} -"
485                             + " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId,
486                             context.getReplicatedLog().getSnapshotIndex() + 1);
487
488                     long followerMatchIndex = snapshot.get().getLastIncludedIndex();
489                     followerLogInformation.setMatchIndex(followerMatchIndex);
490                     followerLogInformation.setNextIndex(followerMatchIndex + 1);
491                     followerLogInformation.clearLeaderInstallSnapshotState();
492
493                     log.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
494                         logName(), followerId, followerLogInformation.getMatchIndex(),
495                         followerLogInformation.getNextIndex());
496
497                     if (!anyFollowersInstallingSnapshot()) {
498                         // once there are no pending followers receiving snapshots
499                         // we can remove snapshot from the memory
500                         setSnapshot(null);
501                     }
502
503                     wasLastChunk = true;
504                     if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
505                         UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess =
506                                              new UnInitializedFollowerSnapshotReply(followerId);
507                         context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor());
508                         log.debug("Sent message UnInitializedFollowerSnapshotReply to self");
509                     }
510                 } else {
511                     installSnapshotState.markSendStatus(true);
512                 }
513             } else {
514                 log.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
515                         logName(), reply.getChunkIndex());
516
517                 installSnapshotState.markSendStatus(false);
518             }
519
520             if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
521                 // Since the follower is now caught up try to purge the log.
522                 purgeInMemoryLog();
523             } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) {
524                 ActorSelection followerActor = context.getPeerActorSelection(followerId);
525                 if (followerActor != null) {
526                     sendSnapshotChunk(followerActor, followerLogInformation);
527                 }
528             }
529
530         } else {
531             log.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
532                     logName(), reply.getChunkIndex(), followerId,
533                     installSnapshotState.getChunkIndex());
534
535             if (reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX) {
536                 // Since the Follower did not find this index to be valid we should reset the follower snapshot
537                 // so that Installing the snapshot can resume from the beginning
538                 installSnapshotState.reset();
539             }
540         }
541     }
542
543     private boolean anyFollowersInstallingSnapshot() {
544         for (FollowerLogInformation info: followerToLog.values()) {
545             if (info.getInstallSnapshotState() != null) {
546                 return true;
547             }
548
549         }
550
551         return false;
552     }
553
554     private void replicate(Replicate replicate) {
555         long logIndex = replicate.getReplicatedLogEntry().getIndex();
556
557         log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(),
558                 replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass());
559
560         // Create a tracker entry we will use this later to notify the
561         // client actor
562         if (replicate.getClientActor() != null) {
563             trackers.add(new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(),
564                     logIndex));
565         }
566
567         boolean applyModificationToState = !context.anyVotingPeers()
568                 || context.getRaftPolicy().applyModificationToStateBeforeConsensus();
569
570         if (applyModificationToState) {
571             context.setCommitIndex(logIndex);
572             applyLogToStateMachine(logIndex);
573         }
574
575         if (!followerToLog.isEmpty()) {
576             sendAppendEntries(0, false);
577         }
578     }
579
580     protected void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
581         // Send an AppendEntries to all followers
582         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
583             final String followerId = e.getKey();
584             final FollowerLogInformation followerLogInformation = e.getValue();
585             // This checks helps not to send a repeat message to the follower
586             if (!followerLogInformation.isFollowerActive()
587                     || followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) {
588                 sendUpdatesToFollower(followerId, followerLogInformation, true, isHeartbeat);
589             }
590         }
591     }
592
593     /**
594      * This method checks if any update needs to be sent to the given follower. This includes append log entries,
595      * sending next snapshot chunk, and initiating a snapshot.
596      *
597      * @return true if any update is sent, false otherwise
598      */
599     private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
600                                        boolean sendHeartbeat, boolean isHeartbeat) {
601
602         ActorSelection followerActor = context.getPeerActorSelection(followerId);
603         if (followerActor != null) {
604             long followerNextIndex = followerLogInformation.getNextIndex();
605             boolean isFollowerActive = followerLogInformation.isFollowerActive();
606             boolean sendAppendEntries = false;
607             List<ReplicatedLogEntry> entries = Collections.emptyList();
608
609             LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
610             if (installSnapshotState != null) {
611                 // if install snapshot is in process , then sent next chunk if possible
612                 if (isFollowerActive && installSnapshotState.canSendNextChunk()) {
613                     sendSnapshotChunk(followerActor, followerLogInformation);
614                 } else if (sendHeartbeat) {
615                     // we send a heartbeat even if we have not received a reply for the last chunk
616                     sendAppendEntries = true;
617                 }
618             } else {
619                 long leaderLastIndex = context.getReplicatedLog().lastIndex();
620                 long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
621
622                 if (!isHeartbeat && log.isDebugEnabled() || log.isTraceEnabled()) {
623                     log.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, "
624                             + "leaderLastIndex: {}, leaderSnapShotIndex: {}", logName(), followerId, isFollowerActive,
625                             followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
626                 }
627
628                 if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
629
630                     log.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
631                             followerNextIndex, followerId);
632
633                     if (followerLogInformation.okToReplicate()) {
634                         // Try to send all the entries in the journal but not exceeding the max data size
635                         // for a single AppendEntries message.
636                         int maxEntries = (int) context.getReplicatedLog().size();
637                         entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries,
638                                 context.getConfigParams().getSnapshotChunkSize());
639                         sendAppendEntries = true;
640                     }
641                 } else if (isFollowerActive && followerNextIndex >= 0
642                         && leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
643                     // if the followers next index is not present in the leaders log, and
644                     // if the follower is just not starting and if leader's index is more than followers index
645                     // then snapshot should be sent
646
647                     if (log.isDebugEnabled()) {
648                         log.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, "
649                                     + "follower-nextIndex: %d, leader-snapshot-index: %d,  "
650                                     + "leader-last-index: %d", logName(), followerId,
651                                     followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
652                     }
653
654                     // Send heartbeat to follower whenever install snapshot is initiated.
655                     sendAppendEntries = true;
656                     if (canInstallSnapshot(followerNextIndex)) {
657                         initiateCaptureSnapshot(followerId);
658                     }
659
660                 } else if (sendHeartbeat) {
661                     // we send an AppendEntries, even if the follower is inactive
662                     // in-order to update the followers timestamp, in case it becomes active again
663                     sendAppendEntries = true;
664                 }
665
666             }
667
668             if (sendAppendEntries) {
669                 sendAppendEntriesToFollower(followerActor, entries, followerLogInformation);
670             }
671         }
672     }
673
674     private void sendAppendEntriesToFollower(ActorSelection followerActor, List<ReplicatedLogEntry> entries,
675             FollowerLogInformation followerLogInformation) {
676         // In certain cases outlined below we don't want to send the actual commit index to prevent the follower from
677         // possibly committing and applying conflicting entries (those with same index, different term) from a prior
678         // term that weren't replicated to a majority, which would be a violation of raft.
679         //     - if the follower isn't active. In this case we don't know the state of the follower and we send an
680         //       empty AppendEntries as a heart beat to prevent election.
681         //     - if we're in the process of installing a snapshot. In this case we don't send any new entries but still
682         //       need to send AppendEntries to prevent election.
683         boolean isInstallingSnaphot = followerLogInformation.getInstallSnapshotState() != null;
684         long leaderCommitIndex = isInstallingSnaphot || !followerLogInformation.isFollowerActive() ? -1 :
685             context.getCommitIndex();
686
687         long followerNextIndex = followerLogInformation.getNextIndex();
688         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
689             getLogEntryIndex(followerNextIndex - 1),
690             getLogEntryTerm(followerNextIndex - 1), entries,
691             leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion());
692
693         if (!entries.isEmpty() || log.isTraceEnabled()) {
694             log.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(),
695                     appendEntries);
696         }
697
698         followerActor.tell(appendEntries, actor());
699     }
700
701     /**
702      * Initiates a snapshot capture to install on a follower.
703      *
704      * <p>
705      * Install Snapshot works as follows
706      *   1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor.
707      *   2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to
708      *      the Leader's handleMessage with a SendInstallSnapshot message.
709      *   3. The Leader obtains and stores the Snapshot from the SendInstallSnapshot message and sends it in chunks to
710      *      the Follower via InstallSnapshot messages.
711      *   4. For each chunk, the Follower sends back an InstallSnapshotReply.
712      *   5. On receipt of the InstallSnapshotReply for the last chunk, the Leader marks the install complete for that
713      *      follower.
714      *   6. If another follower requires a snapshot and a snapshot has been collected (via SendInstallSnapshot)
715      *      then send the existing snapshot in chunks to the follower.
716      *
717      * @param followerId the id of the follower.
718      * @return true if capture was initiated, false otherwise.
719      */
720     public boolean initiateCaptureSnapshot(String followerId) {
721         FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
722         if (snapshot.isPresent()) {
723             // If a snapshot is present in the memory, most likely another install is in progress no need to capture
724             // snapshot. This could happen if another follower needs an install when one is going on.
725             final ActorSelection followerActor = context.getPeerActorSelection(followerId);
726
727             // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
728             sendSnapshotChunk(followerActor, followerLogInfo);
729             return true;
730         } else {
731             boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
732                     this.getReplicatedToAllIndex(), followerId);
733             if (captureInitiated) {
734                 followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
735                         context.getConfigParams().getSnapshotChunkSize(), logName()));
736             }
737
738             return captureInitiated;
739         }
740     }
741
742     private boolean canInstallSnapshot(long nextIndex) {
743         // If the follower's nextIndex is -1 then we might as well send it a snapshot
744         // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
745         // in the snapshot
746         return nextIndex == -1 || !context.getReplicatedLog().isPresent(nextIndex)
747                 && context.getReplicatedLog().isInSnapshot(nextIndex);
748
749     }
750
751
752     private void sendInstallSnapshot() {
753         log.debug("{}: sendInstallSnapshot", logName());
754         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
755             String followerId = e.getKey();
756             ActorSelection followerActor = context.getPeerActorSelection(followerId);
757             FollowerLogInformation followerLogInfo = e.getValue();
758
759             if (followerActor != null) {
760                 long nextIndex = followerLogInfo.getNextIndex();
761                 if (followerLogInfo.getInstallSnapshotState() != null
762                         || context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED
763                         || canInstallSnapshot(nextIndex)) {
764                     sendSnapshotChunk(followerActor, followerLogInfo);
765                 }
766             }
767         }
768     }
769
770     /**
771      *  Sends a snapshot chunk to a given follower
772      *  InstallSnapshot should qualify as a heartbeat too.
773      */
774     private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) {
775         if (snapshot.isPresent()) {
776             LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
777             if (installSnapshotState == null) {
778                 installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(),
779                         logName());
780                 followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState);
781             }
782
783             // Ensure the snapshot bytes are set - this is a no-op.
784             installSnapshotState.setSnapshotBytes(snapshot.get().getSnapshotBytes());
785
786             byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
787
788             log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
789                     nextSnapshotChunk.length);
790
791             int nextChunkIndex = installSnapshotState.incrementChunkIndex();
792             Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
793             if (installSnapshotState.isLastChunk(nextChunkIndex)) {
794                 serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
795             }
796
797             followerActor.tell(
798                 new InstallSnapshot(currentTerm(), context.getId(),
799                     snapshot.get().getLastIncludedIndex(),
800                     snapshot.get().getLastIncludedTerm(),
801                     nextSnapshotChunk,
802                     nextChunkIndex,
803                     installSnapshotState.getTotalChunks(),
804                     Optional.of(installSnapshotState.getLastChunkHashCode()),
805                     serverConfig
806                 ).toSerializable(followerLogInfo.getRaftVersion()),
807                 actor()
808             );
809
810             log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
811                     installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
812         }
813     }
814
815     private void sendHeartBeat() {
816         if (!followerToLog.isEmpty()) {
817             log.trace("{}: Sending heartbeat", logName());
818             sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true);
819         }
820     }
821
822     private void stopHeartBeat() {
823         if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
824             heartbeatSchedule.cancel();
825         }
826     }
827
828     private void scheduleHeartBeat(FiniteDuration interval) {
829         if (followerToLog.isEmpty()) {
830             // Optimization - do not bother scheduling a heartbeat as there are
831             // no followers
832             return;
833         }
834
835         stopHeartBeat();
836
837         // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
838         // message is sent to itself.
839         // Scheduling the heartbeat only once here because heartbeats do not
840         // need to be sent if there are other messages being sent to the remote
841         // actor.
842         heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
843             interval, context.getActor(), SendHeartBeat.INSTANCE,
844             context.getActorSystem().dispatcher(), context.getActor());
845     }
846
847     @Override
848     public void close() {
849         stopHeartBeat();
850     }
851
852     @Override
853     public final String getLeaderId() {
854         return context.getId();
855     }
856
857     @Override
858     public final short getLeaderPayloadVersion() {
859         return context.getPayloadVersion();
860     }
861
862     protected boolean isLeaderIsolated() {
863         int minPresent = getMinIsolatedLeaderPeerCount();
864         for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
865             final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId());
866             if (peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) {
867                 --minPresent;
868                 if (minPresent == 0) {
869                     return false;
870                 }
871             }
872         }
873         return minPresent != 0;
874     }
875
876     // called from example-actor for printing the follower-states
877     public String printFollowerStates() {
878         final StringBuilder sb = new StringBuilder();
879
880         sb.append('[');
881         for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
882             sb.append('{');
883             sb.append(followerLogInformation.getId());
884             sb.append(" state:");
885             sb.append(followerLogInformation.isFollowerActive());
886             sb.append("},");
887         }
888         sb.append(']');
889
890         return sb.toString();
891     }
892
893     @VisibleForTesting
894     public FollowerLogInformation getFollower(String followerId) {
895         return followerToLog.get(followerId);
896     }
897
898     @VisibleForTesting
899     public int followerLogSize() {
900         return followerToLog.size();
901     }
902
903     private static class SnapshotHolder {
904         private final long lastIncludedTerm;
905         private final long lastIncludedIndex;
906         private final ByteString snapshotBytes;
907
908         SnapshotHolder(Snapshot snapshot) {
909             this.lastIncludedTerm = snapshot.getLastAppliedTerm();
910             this.lastIncludedIndex = snapshot.getLastAppliedIndex();
911             this.snapshotBytes = ByteString.copyFrom(snapshot.getState());
912         }
913
914         long getLastIncludedTerm() {
915             return lastIncludedTerm;
916         }
917
918         long getLastIncludedIndex() {
919             return lastIncludedIndex;
920         }
921
922         ByteString getSnapshotBytes() {
923             return snapshotBytes;
924         }
925     }
926 }