2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.base.Throwables;
18 import com.google.common.io.ByteSource;
19 import java.io.IOException;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.LinkedList;
25 import java.util.List;
27 import java.util.Map.Entry;
28 import java.util.Queue;
29 import javax.annotation.Nullable;
30 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
31 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
32 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
33 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
34 import org.opendaylight.controller.cluster.raft.PeerInfo;
35 import org.opendaylight.controller.cluster.raft.RaftActorContext;
36 import org.opendaylight.controller.cluster.raft.RaftState;
37 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
38 import org.opendaylight.controller.cluster.raft.VotingState;
39 import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
40 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
41 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
42 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
43 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
44 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
45 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
46 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
47 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
48 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
49 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
50 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
51 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
52 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
53 import scala.concurrent.duration.FiniteDuration;
56 * The behavior of a RaftActor when it is in the Leader state
61 * <li> Upon election: send initial empty AppendEntries RPCs
62 * (heartbeat) to each server; repeat during idle periods to
63 * prevent election timeouts (§5.2)
64 * <li> If command received from client: append entry to local log,
65 * respond after entry applied to state machine (§5.3)
66 * <li> If last log index ≥ nextIndex for a follower: send
67 * AppendEntries RPC with log entries starting at nextIndex
68 * <li> If successful: update nextIndex and matchIndex for
70 * <li> If AppendEntries fails because of log inconsistency:
71 * decrement nextIndex and retry (§5.3)
72 * <li> If there exists an N such that N > commitIndex, a majority
73 * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
74 * set commitIndex = N (§5.3, §5.4).
77 public abstract class AbstractLeader extends AbstractRaftActorBehavior {
78 private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
81 * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
82 * expect the entries to be modified in sequence, hence we open-code the lookup.
83 * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly,
84 * but we already expect those to be far from frequent.
86 private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
88 private Cancellable heartbeatSchedule = null;
89 private Optional<SnapshotHolder> snapshotHolder = Optional.absent();
90 private int minReplicationCount;
92 protected AbstractLeader(RaftActorContext context, RaftState state,
93 @Nullable AbstractLeader initializeFromLeader) {
94 super(context, state);
96 if (initializeFromLeader != null) {
97 followerToLog.putAll(initializeFromLeader.followerToLog);
98 snapshotHolder = initializeFromLeader.snapshotHolder;
99 trackers.addAll(initializeFromLeader.trackers);
101 for (PeerInfo peerInfo: context.getPeers()) {
102 FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
103 followerToLog.put(peerInfo.getId(), followerLogInformation);
107 log.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
109 updateMinReplicaCount();
111 // Immediately schedule a heartbeat
112 // Upon election: send initial empty AppendEntries RPCs
113 // (heartbeat) to each server; repeat during idle periods to
114 // prevent election timeouts (§5.2)
115 sendAppendEntries(0, false);
117 // It is important to schedule this heartbeat here
118 scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
121 protected AbstractLeader(RaftActorContext context, RaftState state) {
122 this(context, state, null);
126 * Return an immutable collection of follower identifiers.
128 * @return Collection of follower IDs
130 public final Collection<String> getFollowerIds() {
131 return followerToLog.keySet();
134 public void addFollower(String followerId) {
135 FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(
136 context.getPeerInfo(followerId), -1, context);
137 followerToLog.put(followerId, followerLogInformation);
139 if (heartbeatSchedule == null) {
140 scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
144 public void removeFollower(String followerId) {
145 followerToLog.remove(followerId);
148 public void updateMinReplicaCount() {
150 for (PeerInfo peer: context.getPeers()) {
151 if (peer.isVoting()) {
156 minReplicationCount = getMajorityVoteCount(numVoting);
159 protected int getMinIsolatedLeaderPeerCount() {
160 //the isolated Leader peer count will be 1 less than the majority vote count.
161 //this is because the vote count has the self vote counted in it
163 //0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0
164 //2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1
165 //4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2
167 return minReplicationCount > 0 ? minReplicationCount - 1 : 0;
171 void setSnapshot(@Nullable SnapshotHolder snapshotHolder) {
172 this.snapshotHolder = Optional.fromNullable(snapshotHolder);
176 boolean hasSnapshot() {
177 return snapshotHolder.isPresent();
181 protected RaftActorBehavior handleAppendEntries(ActorRef sender,
182 AppendEntries appendEntries) {
184 log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
190 protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
191 log.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
193 // Update the FollowerLogInformation
194 String followerId = appendEntriesReply.getFollowerId();
195 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
197 if (followerLogInformation == null) {
198 log.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId);
202 if (followerLogInformation.timeSinceLastActivity()
203 > context.getConfigParams().getElectionTimeOutInterval().toMillis()) {
204 log.warn("{} : handleAppendEntriesReply delayed beyond election timeout, "
205 + "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}",
206 logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(),
207 context.getLastApplied(), context.getCommitIndex());
210 followerLogInformation.markFollowerActive();
211 followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
212 followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
214 long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
215 long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex);
216 boolean updated = false;
217 if (appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) {
218 // The follower's log is actually ahead of the leader's log. Normally this doesn't happen
219 // in raft as a node cannot become leader if it's log is behind another's. However, the
220 // non-voting semantics deviate a bit from raft. Only voting members participate in
221 // elections and can become leader so it's possible for a non-voting follower to be ahead
222 // of the leader. This can happen if persistence is disabled and all voting members are
223 // restarted. In this case, the voting leader will start out with an empty log however
224 // the non-voting followers still retain the previous data in memory. On the first
225 // AppendEntries, the non-voting follower returns a successful reply b/c the prevLogIndex
226 // sent by the leader is -1 and thus the integrity checks pass. However the follower's returned
227 // lastLogIndex may be higher in which case we want to reset the follower by installing a
228 // snapshot. It's also possible that the follower's last log index is behind the leader's.
229 // However in this case the log terms won't match and the logs will conflict - this is handled
231 log.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} "
232 + "(snapshotIndex {}) - forcing install snaphot", logName(), followerLogInformation.getId(),
233 appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex(),
234 context.getReplicatedLog().getSnapshotIndex());
236 followerLogInformation.setMatchIndex(-1);
237 followerLogInformation.setNextIndex(-1);
239 initiateCaptureSnapshot(followerId);
242 } else if (appendEntriesReply.isSuccess()) {
243 if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0
244 && followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) {
245 // The follower's last entry is present in the leader's journal but the terms don't match so the
246 // follower has a conflicting entry. Since the follower didn't report that it's out of sync, this means
247 // either the previous leader entry sent didn't conflict or the previous leader entry is in the snapshot
248 // and no longer in the journal. Either way, we set the follower's next index to 1 less than the last
249 // index reported by the follower. For the former case, the leader will send all entries starting with
250 // the previous follower's index and the follower will remove and replace the conflicting entries as
251 // needed. For the latter, the leader will initiate an install snapshot.
253 followerLogInformation.setNextIndex(followerLastLogIndex - 1);
256 log.info("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
257 + "leader's {} - set the follower's next index to {}", logName(),
258 followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
259 followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
261 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
264 log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}",
265 logName(), appendEntriesReply, context.getReplicatedLog().getSnapshotIndex());
267 if (appendEntriesReply.isForceInstallSnapshot()) {
268 // Reset the followers match and next index. This is to signal that this follower has nothing
269 // in common with this Leader and so would require a snapshot to be installed
270 followerLogInformation.setMatchIndex(-1);
271 followerLogInformation.setNextIndex(-1);
273 // Force initiate a snapshot capture
274 initiateCaptureSnapshot(followerId);
275 } else if (followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0
276 && followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) {
277 // The follower's log is empty or the last entry is present in the leader's journal
278 // and the terms match so the follower is just behind the leader's journal from
279 // the last snapshot, if any. We'll catch up the follower quickly by starting at the
280 // follower's last log index.
282 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
284 log.info("{}: follower {} appears to be behind the leader from the last snapshot - "
285 + "updated: matchIndex: {}, nextIndex: {}", logName(), followerId,
286 appendEntriesReply.getLogLastTerm(), followerLogInformation.getMatchIndex(),
287 followerLogInformation.getNextIndex());
289 // The follower's log conflicts with leader's log so decrement follower's next index by 1
290 // in an attempt to find where the logs match.
292 if (followerLogInformation.decrNextIndex()) {
295 log.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}",
296 logName(), followerId, appendEntriesReply.getLogLastTerm(),
297 followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
302 if (log.isTraceEnabled()) {
303 log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
304 logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
307 possiblyUpdateCommitIndex();
309 //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
310 sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
315 private void possiblyUpdateCommitIndex() {
316 // Figure out if we can update the the commitIndex as follows:
317 // If there exists an index N such that N > commitIndex, a majority of matchIndex[i] ≥ N,
318 // and log[N].term == currentTerm:
319 // set commitIndex = N (§5.3, §5.4).
320 for (long index = context.getCommitIndex() + 1; ; index++) {
321 ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
322 if (replicatedLogEntry == null) {
323 log.trace("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
324 logName(), index, context.getReplicatedLog().getSnapshotIndex(),
325 context.getReplicatedLog().size());
329 // Count our entry if it has been persisted.
330 int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1;
332 if (replicatedCount == 0) {
333 // We don't commit and apply a log entry until we've gotten the ack from our local persistence,
334 // even though there *shouldn't* be any issue with updating the commit index if we get a consensus
335 // amongst the followers w/o the local persistence ack.
339 log.trace("{}: checking Nth index {}", logName(), index);
340 for (FollowerLogInformation info : followerToLog.values()) {
341 final PeerInfo peerInfo = context.getPeerInfo(info.getId());
342 if (info.getMatchIndex() >= index && peerInfo != null && peerInfo.isVoting()) {
344 } else if (log.isTraceEnabled()) {
345 log.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
346 info.getMatchIndex(), peerInfo);
350 if (log.isTraceEnabled()) {
351 log.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount,
352 minReplicationCount);
355 if (replicatedCount >= minReplicationCount) {
356 // Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
357 // "Raft never commits log entries from previous terms by counting replicas".
358 // However we keep looping so we can make progress when new entries in the current term
359 // reach consensus, as per §5.4.1: "once an entry from the current term is committed by
360 // counting replicas, then all prior entries are committed indirectly".
361 if (replicatedLogEntry.getTerm() == currentTerm()) {
362 log.trace("{}: Setting commit index to {}", logName(), index);
363 context.setCommitIndex(index);
365 log.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, "
366 + "term {} does not match the current term {}", logName(), index,
367 replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm());
370 log.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount);
375 // Apply the change to the state machine
376 if (context.getCommitIndex() > context.getLastApplied()) {
377 log.debug("{}: Applying to log - commitIndex: {}, lastAppliedIndex: {}", logName(),
378 context.getCommitIndex(), context.getLastApplied());
380 applyLogToStateMachine(context.getCommitIndex());
383 if (!context.getSnapshotManager().isCapturing()) {
388 private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation,
389 AppendEntriesReply appendEntriesReply) {
390 boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex());
391 updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated;
393 if (updated && log.isDebugEnabled()) {
395 "{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}",
396 logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(),
397 followerLogInformation.getNextIndex());
402 private void purgeInMemoryLog() {
403 //find the lowest index across followers which has been replicated to all.
404 // lastApplied if there are no followers, so that we keep clearing the log for single-node
405 // we would delete the in-mem log from that index on, in-order to minimize mem usage
406 // we would also share this info thru AE with the followers so that they can delete their log entries as well.
407 long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE;
408 for (FollowerLogInformation info : followerToLog.values()) {
409 minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
412 super.performSnapshotWithoutCapture(minReplicatedToAllIndex);
416 protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
417 final Iterator<ClientRequestTracker> it = trackers.iterator();
418 while (it.hasNext()) {
419 final ClientRequestTracker t = it.next();
420 if (t.getIndex() == logIndex) {
430 protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
431 RequestVoteReply requestVoteReply) {
435 protected void beforeSendHeartbeat(){}
438 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
439 Preconditions.checkNotNull(sender, "sender should not be null");
441 if (message instanceof RaftRPC) {
442 RaftRPC rpc = (RaftRPC) message;
443 // If RPC request or response contains term T > currentTerm:
444 // set currentTerm = T, convert to follower (§5.1)
445 // This applies to all RPC messages and responses
446 if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
447 log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
448 logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
450 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
452 // This is a special case. Normally when stepping down as leader we don't process and reply to the
453 // RaftRPC as per raft. But if we're in the process of transferring leadership and we get a
454 // RequestVote, process the RequestVote before switching to Follower. This enables the requesting
455 // candidate node to be elected the leader faster and avoids us possibly timing out in the Follower
456 // state and starting a new election and grabbing leadership back before the other candidate node can
457 // start a new election due to lack of responses. This case would only occur if there isn't a majority
458 // of other nodes available that can elect the requesting candidate. Since we're transferring
459 // leadership, we should make every effort to get the requesting node elected.
460 if (message instanceof RequestVote && context.getRaftActorLeadershipTransferCohort() != null) {
461 log.debug("{}: Leadership transfer in progress - processing RequestVote", logName());
462 super.handleMessage(sender, message);
465 return internalSwitchBehavior(RaftState.Follower);
469 if (message instanceof SendHeartBeat) {
470 beforeSendHeartbeat();
472 scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
473 } else if (message instanceof SendInstallSnapshot) {
474 SendInstallSnapshot sendInstallSnapshot = (SendInstallSnapshot) message;
475 setSnapshot(new SnapshotHolder(sendInstallSnapshot.getSnapshot(), sendInstallSnapshot.getSnapshotBytes()));
476 sendInstallSnapshot();
477 } else if (message instanceof Replicate) {
478 replicate((Replicate) message);
479 } else if (message instanceof InstallSnapshotReply) {
480 handleInstallSnapshotReply((InstallSnapshotReply) message);
481 } else if (message instanceof CheckConsensusReached) {
482 possiblyUpdateCommitIndex();
484 return super.handleMessage(sender, message);
490 private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
491 log.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
493 String followerId = reply.getFollowerId();
494 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
495 if (followerLogInformation == null) {
496 // This can happen during AddServer if it times out.
497 log.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply",
498 logName(), followerId);
502 LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
503 if (installSnapshotState == null) {
504 log.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply",
505 logName(), followerId);
509 followerLogInformation.markFollowerActive();
511 if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) {
512 boolean wasLastChunk = false;
513 if (reply.isSuccess()) {
514 if (installSnapshotState.isLastChunk(reply.getChunkIndex())) {
515 //this was the last chunk reply
517 long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex();
518 followerLogInformation.setMatchIndex(followerMatchIndex);
519 followerLogInformation.setNextIndex(followerMatchIndex + 1);
520 followerLogInformation.clearLeaderInstallSnapshotState();
522 log.info("{}: Snapshot successfully installed on follower {} (last chunk {}) - "
523 + "matchIndex set to {}, nextIndex set to {}", logName(), followerId, reply.getChunkIndex(),
524 followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex());
526 if (!anyFollowersInstallingSnapshot()) {
527 // once there are no pending followers receiving snapshots
528 // we can remove snapshot from the memory
533 if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
534 UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess =
535 new UnInitializedFollowerSnapshotReply(followerId);
536 context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor());
537 log.debug("Sent message UnInitializedFollowerSnapshotReply to self");
540 installSnapshotState.markSendStatus(true);
543 log.warn("{}: Received failed InstallSnapshotReply - will retry: {}", logName(), reply);
545 installSnapshotState.markSendStatus(false);
549 if (!context.getSnapshotManager().isCapturing()) {
550 // Since the follower is now caught up try to purge the log.
554 ActorSelection followerActor = context.getPeerActorSelection(followerId);
555 if (followerActor != null) {
556 sendSnapshotChunk(followerActor, followerLogInformation);
561 log.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
562 logName(), reply.getChunkIndex(), followerId,
563 installSnapshotState.getChunkIndex());
565 if (reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX) {
566 // Since the Follower did not find this index to be valid we should reset the follower snapshot
567 // so that Installing the snapshot can resume from the beginning
568 installSnapshotState.reset();
573 private boolean anyFollowersInstallingSnapshot() {
574 for (FollowerLogInformation info: followerToLog.values()) {
575 if (info.getInstallSnapshotState() != null) {
584 private void replicate(Replicate replicate) {
585 long logIndex = replicate.getReplicatedLogEntry().getIndex();
587 log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}, isSendImmediate: {}", logName(),
588 replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass(),
589 replicate.isSendImmediate());
591 // Create a tracker entry we will use this later to notify the
593 if (replicate.getClientActor() != null) {
594 trackers.add(new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(),
598 boolean applyModificationToState = !context.anyVotingPeers()
599 || context.getRaftPolicy().applyModificationToStateBeforeConsensus();
601 if (applyModificationToState) {
602 context.setCommitIndex(logIndex);
603 applyLogToStateMachine(logIndex);
606 if (replicate.isSendImmediate() && !followerToLog.isEmpty()) {
607 sendAppendEntries(0, false);
611 protected void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
612 // Send an AppendEntries to all followers
613 for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
614 final String followerId = e.getKey();
615 final FollowerLogInformation followerLogInformation = e.getValue();
616 // This checks helps not to send a repeat message to the follower
617 if (!followerLogInformation.isFollowerActive()
618 || followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) {
619 sendUpdatesToFollower(followerId, followerLogInformation, true, isHeartbeat);
625 * This method checks if any update needs to be sent to the given follower. This includes append log entries,
626 * sending next snapshot chunk, and initiating a snapshot.
628 private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
629 boolean sendHeartbeat, boolean isHeartbeat) {
631 ActorSelection followerActor = context.getPeerActorSelection(followerId);
632 if (followerActor != null) {
633 long followerNextIndex = followerLogInformation.getNextIndex();
634 boolean isFollowerActive = followerLogInformation.isFollowerActive();
635 boolean sendAppendEntries = false;
636 List<ReplicatedLogEntry> entries = Collections.emptyList();
638 LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
639 if (installSnapshotState != null) {
640 // if install snapshot is in process , then sent next chunk if possible
641 if (isFollowerActive && installSnapshotState.canSendNextChunk()) {
642 sendSnapshotChunk(followerActor, followerLogInformation);
643 } else if (sendHeartbeat) {
644 // we send a heartbeat even if we have not received a reply for the last chunk
645 sendAppendEntries = true;
648 long leaderLastIndex = context.getReplicatedLog().lastIndex();
649 long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
651 if (!isHeartbeat && log.isDebugEnabled() || log.isTraceEnabled()) {
652 log.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, "
653 + "leaderLastIndex: {}, leaderSnapShotIndex: {}", logName(), followerId, isFollowerActive,
654 followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
657 if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
659 log.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
660 followerNextIndex, followerId);
662 if (followerLogInformation.okToReplicate()) {
663 // Try to send all the entries in the journal but not exceeding the max data size
664 // for a single AppendEntries message.
665 int maxEntries = (int) context.getReplicatedLog().size();
666 entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries,
667 context.getConfigParams().getSnapshotChunkSize());
668 sendAppendEntries = true;
670 } else if (isFollowerActive && followerNextIndex >= 0
671 && leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
672 // if the followers next index is not present in the leaders log, and
673 // if the follower is just not starting and if leader's index is more than followers index
674 // then snapshot should be sent
676 // Send heartbeat to follower whenever install snapshot is initiated.
677 sendAppendEntries = true;
678 if (canInstallSnapshot(followerNextIndex)) {
679 log.info("{}: Initiating install snapshot to follower {}: follower nextIndex: {}, leader "
680 + "snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), followerId,
681 followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
682 context.getReplicatedLog().size());
684 initiateCaptureSnapshot(followerId);
686 // It doesn't seem like we should ever reach here - most likely indicates sonething is
688 log.info("{}: Follower {} is behind but cannot install snapshot: follower nextIndex: {}, "
689 + "leader snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(),
690 followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
691 context.getReplicatedLog().size());
694 } else if (sendHeartbeat) {
695 // we send an AppendEntries, even if the follower is inactive
696 // in-order to update the followers timestamp, in case it becomes active again
697 sendAppendEntries = true;
702 if (sendAppendEntries) {
703 sendAppendEntriesToFollower(followerActor, entries, followerLogInformation);
708 private void sendAppendEntriesToFollower(ActorSelection followerActor, List<ReplicatedLogEntry> entries,
709 FollowerLogInformation followerLogInformation) {
710 // In certain cases outlined below we don't want to send the actual commit index to prevent the follower from
711 // possibly committing and applying conflicting entries (those with same index, different term) from a prior
712 // term that weren't replicated to a majority, which would be a violation of raft.
713 // - if the follower isn't active. In this case we don't know the state of the follower and we send an
714 // empty AppendEntries as a heart beat to prevent election.
715 // - if we're in the process of installing a snapshot. In this case we don't send any new entries but still
716 // need to send AppendEntries to prevent election.
717 boolean isInstallingSnaphot = followerLogInformation.getInstallSnapshotState() != null;
718 long leaderCommitIndex = isInstallingSnaphot || !followerLogInformation.isFollowerActive() ? -1 :
719 context.getCommitIndex();
721 long followerNextIndex = followerLogInformation.getNextIndex();
722 AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
723 getLogEntryIndex(followerNextIndex - 1),
724 getLogEntryTerm(followerNextIndex - 1), entries,
725 leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion());
727 if (!entries.isEmpty() || log.isTraceEnabled()) {
728 log.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(),
732 followerActor.tell(appendEntries, actor());
736 * Initiates a snapshot capture to install on a follower.
739 * Install Snapshot works as follows
740 * 1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor.
741 * 2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to
742 * the Leader's handleMessage with a SendInstallSnapshot message.
743 * 3. The Leader obtains and stores the Snapshot from the SendInstallSnapshot message and sends it in chunks to
744 * the Follower via InstallSnapshot messages.
745 * 4. For each chunk, the Follower sends back an InstallSnapshotReply.
746 * 5. On receipt of the InstallSnapshotReply for the last chunk, the Leader marks the install complete for that
748 * 6. If another follower requires a snapshot and a snapshot has been collected (via SendInstallSnapshot)
749 * then send the existing snapshot in chunks to the follower.
751 * @param followerId the id of the follower.
752 * @return true if capture was initiated, false otherwise.
754 public boolean initiateCaptureSnapshot(String followerId) {
755 FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
756 if (snapshotHolder.isPresent()) {
757 // If a snapshot is present in the memory, most likely another install is in progress no need to capture
758 // snapshot. This could happen if another follower needs an install when one is going on.
759 final ActorSelection followerActor = context.getPeerActorSelection(followerId);
761 // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
762 sendSnapshotChunk(followerActor, followerLogInfo);
766 boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
767 this.getReplicatedToAllIndex(), followerId);
768 if (captureInitiated) {
769 followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
770 context.getConfigParams().getSnapshotChunkSize(), logName()));
773 return captureInitiated;
776 private boolean canInstallSnapshot(long nextIndex) {
777 // If the follower's nextIndex is -1 then we might as well send it a snapshot
778 // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
780 return nextIndex == -1 || !context.getReplicatedLog().isPresent(nextIndex)
781 && context.getReplicatedLog().isInSnapshot(nextIndex);
786 private void sendInstallSnapshot() {
787 log.debug("{}: sendInstallSnapshot", logName());
788 for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
789 String followerId = e.getKey();
790 ActorSelection followerActor = context.getPeerActorSelection(followerId);
791 FollowerLogInformation followerLogInfo = e.getValue();
793 if (followerActor != null) {
794 long nextIndex = followerLogInfo.getNextIndex();
795 if (followerLogInfo.getInstallSnapshotState() != null
796 || context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED
797 || canInstallSnapshot(nextIndex)) {
798 sendSnapshotChunk(followerActor, followerLogInfo);
805 * Sends a snapshot chunk to a given follower
806 * InstallSnapshot should qualify as a heartbeat too.
808 private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) {
809 if (snapshotHolder.isPresent()) {
810 LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
811 if (installSnapshotState == null) {
812 installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(),
814 followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState);
818 // Ensure the snapshot bytes are set - this is a no-op.
819 installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
821 if (!installSnapshotState.canSendNextChunk()) {
825 byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
827 log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
828 nextSnapshotChunk.length);
830 int nextChunkIndex = installSnapshotState.incrementChunkIndex();
831 Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
832 if (installSnapshotState.isLastChunk(nextChunkIndex)) {
833 serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
837 new InstallSnapshot(currentTerm(), context.getId(),
838 snapshotHolder.get().getLastIncludedIndex(),
839 snapshotHolder.get().getLastIncludedTerm(),
842 installSnapshotState.getTotalChunks(),
843 Optional.of(installSnapshotState.getLastChunkHashCode()),
845 ).toSerializable(followerLogInfo.getRaftVersion()),
849 log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
850 installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
851 } catch (IOException e) {
852 throw Throwables.propagate(e);
857 private void sendHeartBeat() {
858 if (!followerToLog.isEmpty()) {
859 log.trace("{}: Sending heartbeat", logName());
860 sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true);
864 private void stopHeartBeat() {
865 if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
866 heartbeatSchedule.cancel();
870 private void scheduleHeartBeat(FiniteDuration interval) {
871 if (followerToLog.isEmpty()) {
872 // Optimization - do not bother scheduling a heartbeat as there are
879 // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
880 // message is sent to itself.
881 // Scheduling the heartbeat only once here because heartbeats do not
882 // need to be sent if there are other messages being sent to the remote
884 heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
885 interval, context.getActor(), SendHeartBeat.INSTANCE,
886 context.getActorSystem().dispatcher(), context.getActor());
890 public void close() {
895 public final String getLeaderId() {
896 return context.getId();
900 public final short getLeaderPayloadVersion() {
901 return context.getPayloadVersion();
904 protected boolean isLeaderIsolated() {
905 int minPresent = getMinIsolatedLeaderPeerCount();
906 for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
907 final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId());
908 if (peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) {
910 if (minPresent == 0) {
915 return minPresent != 0;
918 // called from example-actor for printing the follower-states
919 public String printFollowerStates() {
920 final StringBuilder sb = new StringBuilder();
923 for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
925 sb.append(followerLogInformation.getId());
926 sb.append(" state:");
927 sb.append(followerLogInformation.isFollowerActive());
932 return sb.toString();
936 public FollowerLogInformation getFollower(String followerId) {
937 return followerToLog.get(followerId);
941 public int followerLogSize() {
942 return followerToLog.size();
945 static class SnapshotHolder {
946 private final long lastIncludedTerm;
947 private final long lastIncludedIndex;
948 private final ByteSource snapshotBytes;
950 SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) {
951 this.lastIncludedTerm = snapshot.getLastAppliedTerm();
952 this.lastIncludedIndex = snapshot.getLastAppliedIndex();
953 this.snapshotBytes = snapshotBytes;
956 long getLastIncludedTerm() {
957 return lastIncludedTerm;
960 long getLastIncludedIndex() {
961 return lastIncludedIndex;
964 ByteSource getSnapshotBytes() {
965 return snapshotBytes;