2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.collect.ImmutableMap;
18 import com.google.common.collect.ImmutableMap.Builder;
19 import com.google.protobuf.ByteString;
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.Iterator;
25 import java.util.LinkedList;
26 import java.util.List;
28 import java.util.Map.Entry;
29 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
30 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
31 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
32 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
33 import org.opendaylight.controller.cluster.raft.RaftActorContext;
34 import org.opendaylight.controller.cluster.raft.RaftState;
35 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
36 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
42 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
43 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
44 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
45 import scala.concurrent.duration.FiniteDuration;
48 * The behavior of a RaftActor when it is in the Leader state
52 * <li> Upon election: send initial empty AppendEntries RPCs
53 * (heartbeat) to each server; repeat during idle periods to
54 * prevent election timeouts (§5.2)
55 * <li> If command received from client: append entry to local log,
56 * respond after entry applied to state machine (§5.3)
57 * <li> If last log index ≥ nextIndex for a follower: send
58 * AppendEntries RPC with log entries starting at nextIndex
60 * <li> If successful: update nextIndex and matchIndex for
62 * <li> If AppendEntries fails because of log inconsistency:
63 * decrement nextIndex and retry (§5.3)
65 * <li> If there exists an N such that N > commitIndex, a majority
66 * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
67 * set commitIndex = N (§5.3, §5.4).
69 public abstract class AbstractLeader extends AbstractRaftActorBehavior {
71 // The index of the first chunk that is sent when installing a snapshot
72 public static final int FIRST_CHUNK_INDEX = 1;
74 // The index that the follower should respond with if it needs the install snapshot to be reset
75 public static final int INVALID_CHUNK_INDEX = -1;
77 // This would be passed as the hash code of the last chunk when sending the first chunk
78 public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
80 private final Map<String, FollowerLogInformation> followerToLog;
81 private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
83 private Cancellable heartbeatSchedule = null;
85 private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
87 protected final int minReplicationCount;
89 protected final int minIsolatedLeaderPeerCount;
91 private Optional<ByteString> snapshot;
93 public AbstractLeader(RaftActorContext context) {
94 super(context, RaftState.Leader);
96 final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
97 for (String followerId : context.getPeerAddresses().keySet()) {
98 FollowerLogInformation followerLogInformation =
99 new FollowerLogInformationImpl(followerId, -1, context);
101 ftlBuilder.put(followerId, followerLogInformation);
103 followerToLog = ftlBuilder.build();
105 leaderId = context.getId();
107 LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
109 minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
111 // the isolated Leader peer count will be 1 less than the majority vote count.
112 // this is because the vote count has the self vote counted in it
114 // 0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0
115 // 2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1
116 // 4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2
117 minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
119 snapshot = Optional.absent();
121 // Immediately schedule a heartbeat
122 // Upon election: send initial empty AppendEntries RPCs
123 // (heartbeat) to each server; repeat during idle periods to
124 // prevent election timeouts (§5.2)
125 sendAppendEntries(0, false);
127 // It is important to schedule this heartbeat here
128 scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
132 * Return an immutable collection of follower identifiers.
134 * @return Collection of follower IDs
136 public final Collection<String> getFollowerIds() {
137 return followerToLog.keySet();
141 void setSnapshot(Optional<ByteString> snapshot) {
142 this.snapshot = snapshot;
146 protected RaftActorBehavior handleAppendEntries(ActorRef sender,
147 AppendEntries appendEntries) {
149 LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
155 protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
156 AppendEntriesReply appendEntriesReply) {
158 if(LOG.isTraceEnabled()) {
159 LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
162 // Update the FollowerLogInformation
163 String followerId = appendEntriesReply.getFollowerId();
164 FollowerLogInformation followerLogInformation =
165 followerToLog.get(followerId);
167 if(followerLogInformation == null){
168 LOG.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId);
172 if(followerLogInformation.timeSinceLastActivity() >
173 context.getConfigParams().getElectionTimeOutInterval().toMillis()) {
174 LOG.warn("{} : handleAppendEntriesReply delayed beyond election timeout, " +
175 "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}",
176 logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(),
177 context.getLastApplied(), context.getCommitIndex());
180 followerLogInformation.markFollowerActive();
181 followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
183 boolean updated = false;
184 if (appendEntriesReply.isSuccess()) {
185 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
187 LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
189 long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
190 ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex);
191 if(followerLastLogIndex < 0 || (followersLastLogEntry != null &&
192 followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) {
193 // The follower's log is empty or the last entry is present in the leader's journal
194 // and the terms match so the follower is just behind the leader's journal from
195 // the last snapshot, if any. We'll catch up the follower quickly by starting at the
196 // follower's last log index.
198 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
200 // TODO: When we find that the follower is out of sync with the
201 // Leader we simply decrement that followers next index by 1.
202 // Would it be possible to do better than this? The RAFT spec
203 // does not explicitly deal with it but may be something for us to
206 followerLogInformation.decrNextIndex();
210 // Now figure out if this reply warrants a change in the commitIndex
211 // If there exists an N such that N > commitIndex, a majority
212 // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
213 // set commitIndex = N (§5.3, §5.4).
214 for (long N = context.getCommitIndex() + 1; ; N++) {
215 int replicatedCount = 1;
217 for (FollowerLogInformation info : followerToLog.values()) {
218 if (info.getMatchIndex() >= N) {
223 if (replicatedCount >= minReplicationCount) {
224 ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
225 if (replicatedLogEntry != null &&
226 replicatedLogEntry.getTerm() == currentTerm()) {
227 context.setCommitIndex(N);
234 // Apply the change to the state machine
235 if (context.getCommitIndex() > context.getLastApplied()) {
236 if(LOG.isDebugEnabled()) {
237 LOG.debug("{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}",
238 logName(), followerId, context.getCommitIndex(), context.getLastApplied());
241 applyLogToStateMachine(context.getCommitIndex());
244 if (!context.getSnapshotManager().isCapturing()) {
248 //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
249 sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
253 private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation,
254 AppendEntriesReply appendEntriesReply) {
255 boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex());
256 updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated;
258 if(updated && LOG.isDebugEnabled()) {
259 LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}",
260 logName(), followerLogInformation.getId(), followerLogInformation.getMatchIndex(),
261 followerLogInformation.getNextIndex());
266 private void purgeInMemoryLog() {
267 //find the lowest index across followers which has been replicated to all.
268 // lastApplied if there are no followers, so that we keep clearing the log for single-node
269 // we would delete the in-mem log from that index on, in-order to minimize mem usage
270 // we would also share this info thru AE with the followers so that they can delete their log entries as well.
271 long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE;
272 for (FollowerLogInformation info : followerToLog.values()) {
273 minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
276 super.performSnapshotWithoutCapture(minReplicatedToAllIndex);
280 protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
281 final Iterator<ClientRequestTracker> it = trackerList.iterator();
282 while (it.hasNext()) {
283 final ClientRequestTracker t = it.next();
284 if (t.getIndex() == logIndex) {
294 protected ClientRequestTracker findClientRequestTracker(long logIndex) {
295 for (ClientRequestTracker tracker : trackerList) {
296 if (tracker.getIndex() == logIndex) {
304 protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
305 RequestVoteReply requestVoteReply) {
309 protected void beforeSendHeartbeat(){}
312 public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
313 Preconditions.checkNotNull(sender, "sender should not be null");
315 Object message = fromSerializableMessage(originalMessage);
317 if (message instanceof RaftRPC) {
318 RaftRPC rpc = (RaftRPC) message;
319 // If RPC request or response contains term T > currentTerm:
320 // set currentTerm = T, convert to follower (§5.1)
321 // This applies to all RPC messages and responses
322 if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
323 LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
324 logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
326 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
328 return switchBehavior(new Follower(context));
332 if (message instanceof SendHeartBeat) {
333 beforeSendHeartbeat();
335 scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
338 } else if(message instanceof SendInstallSnapshot) {
339 // received from RaftActor
340 setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
341 sendInstallSnapshot();
343 } else if (message instanceof Replicate) {
344 replicate((Replicate) message);
346 } else if (message instanceof InstallSnapshotReply){
347 handleInstallSnapshotReply((InstallSnapshotReply) message);
352 return super.handleMessage(sender, message);
355 private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
356 LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
358 String followerId = reply.getFollowerId();
359 FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
361 if (followerToSnapshot == null) {
362 LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
363 logName(), followerId);
367 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
368 followerLogInformation.markFollowerActive();
370 if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
371 boolean wasLastChunk = false;
372 if (reply.isSuccess()) {
373 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
374 //this was the last chunk reply
375 if(LOG.isDebugEnabled()) {
376 LOG.debug("{}: InstallSnapshotReply received, " +
377 "last chunk received, Chunk: {}. Follower: {} Setting nextIndex: {}",
378 logName(), reply.getChunkIndex(), followerId,
379 context.getReplicatedLog().getSnapshotIndex() + 1
383 followerLogInformation.setMatchIndex(
384 context.getReplicatedLog().getSnapshotIndex());
385 followerLogInformation.setNextIndex(
386 context.getReplicatedLog().getSnapshotIndex() + 1);
387 mapFollowerToSnapshot.remove(followerId);
389 LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
390 logName(), followerId, followerLogInformation.getMatchIndex(),
391 followerLogInformation.getNextIndex());
393 if (mapFollowerToSnapshot.isEmpty()) {
394 // once there are no pending followers receiving snapshots
395 // we can remove snapshot from the memory
396 setSnapshot(Optional.<ByteString>absent());
401 followerToSnapshot.markSendStatus(true);
404 LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
405 logName(), reply.getChunkIndex());
407 followerToSnapshot.markSendStatus(false);
410 if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
411 // Since the follower is now caught up try to purge the log.
413 } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
414 ActorSelection followerActor = context.getPeerActorSelection(followerId);
415 if(followerActor != null) {
416 sendSnapshotChunk(followerActor, followerId);
421 LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
422 logName(), reply.getChunkIndex(), followerId,
423 followerToSnapshot.getChunkIndex());
425 if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
426 // Since the Follower did not find this index to be valid we should reset the follower snapshot
427 // so that Installing the snapshot can resume from the beginning
428 followerToSnapshot.reset();
433 private void replicate(Replicate replicate) {
434 long logIndex = replicate.getReplicatedLogEntry().getIndex();
436 LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}", logName(),
437 replicate.getIdentifier(), logIndex);
439 // Create a tracker entry we will use this later to notify the
442 new ClientRequestTrackerImpl(replicate.getClientActor(),
443 replicate.getIdentifier(),
447 if (followerToLog.isEmpty()) {
448 context.setCommitIndex(logIndex);
449 applyLogToStateMachine(logIndex);
451 sendAppendEntries(0, false);
455 private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
456 // Send an AppendEntries to all followers
457 for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
458 final String followerId = e.getKey();
459 final FollowerLogInformation followerLogInformation = e.getValue();
460 // This checks helps not to send a repeat message to the follower
461 if(!followerLogInformation.isFollowerActive() ||
462 followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) {
463 sendUpdatesToFollower(followerId, followerLogInformation, true, isHeartbeat);
470 * This method checks if any update needs to be sent to the given follower. This includes append log entries,
471 * sending next snapshot chunk, and initiating a snapshot.
472 * @return true if any update is sent, false otherwise
475 private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
476 boolean sendHeartbeat, boolean isHeartbeat) {
478 ActorSelection followerActor = context.getPeerActorSelection(followerId);
479 if (followerActor != null) {
480 long followerNextIndex = followerLogInformation.getNextIndex();
481 boolean isFollowerActive = followerLogInformation.isFollowerActive();
482 boolean sendAppendEntries = false;
483 List<ReplicatedLogEntry> entries = Collections.emptyList();
485 if (mapFollowerToSnapshot.get(followerId) != null) {
486 // if install snapshot is in process , then sent next chunk if possible
487 if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
488 sendSnapshotChunk(followerActor, followerId);
489 } else if(sendHeartbeat) {
490 // we send a heartbeat even if we have not received a reply for the last chunk
491 sendAppendEntries = true;
494 long leaderLastIndex = context.getReplicatedLog().lastIndex();
495 long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
497 if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) {
498 LOG.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
499 logName(), followerId, isFollowerActive, followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
502 if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
504 LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
505 followerNextIndex, followerId);
507 if(followerLogInformation.okToReplicate()) {
508 // Try to send all the entries in the journal but not exceeding the max data size
509 // for a single AppendEntries message.
510 int maxEntries = (int) context.getReplicatedLog().size();
511 entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries,
512 context.getConfigParams().getSnapshotChunkSize());
513 sendAppendEntries = true;
515 } else if (isFollowerActive && followerNextIndex >= 0 &&
516 leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
517 // if the followers next index is not present in the leaders log, and
518 // if the follower is just not starting and if leader's index is more than followers index
519 // then snapshot should be sent
521 if (LOG.isDebugEnabled()) {
522 LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
523 "follower-nextIndex: %d, leader-snapshot-index: %d, " +
524 "leader-last-index: %d", logName(), followerId,
525 followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
528 // Send heartbeat to follower whenever install snapshot is initiated.
529 sendAppendEntries = true;
530 initiateCaptureSnapshot(followerId, followerNextIndex);
532 } else if(sendHeartbeat) {
533 // we send an AppendEntries, even if the follower is inactive
534 // in-order to update the followers timestamp, in case it becomes active again
535 sendAppendEntries = true;
540 if(sendAppendEntries) {
541 sendAppendEntriesToFollower(followerActor, followerNextIndex,
542 entries, followerId);
547 private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
548 List<ReplicatedLogEntry> entries, String followerId) {
549 AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
550 prevLogIndex(followerNextIndex),
551 prevLogTerm(followerNextIndex), entries,
552 context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion());
554 if(!entries.isEmpty() || LOG.isTraceEnabled()) {
555 LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId,
559 followerActor.tell(appendEntries.toSerializable(), actor());
563 * Install Snapshot works as follows
564 * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
565 * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
566 * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
567 * 3. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
568 * 4. On complete, Follower sends back a InstallSnapshotReply.
569 * 5. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
570 * and replenishes the memory by deleting the snapshot in Replicated log.
571 * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply)
572 * then send the existing snapshot in chunks to the follower.
574 * @param followerNextIndex
576 private void initiateCaptureSnapshot(String followerId, long followerNextIndex) {
577 if (!context.getReplicatedLog().isPresent(followerNextIndex) &&
578 context.getReplicatedLog().isInSnapshot(followerNextIndex)) {
580 if (snapshot.isPresent()) {
581 // if a snapshot is present in the memory, most likely another install is in progress
582 // no need to capture snapshot.
583 // This could happen if another follower needs an install when one is going on.
584 final ActorSelection followerActor = context.getPeerActorSelection(followerId);
585 sendSnapshotChunk(followerActor, followerId);
589 context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
590 this.getReplicatedToAllIndex(), followerId);
596 private void sendInstallSnapshot() {
597 LOG.debug("{}: sendInstallSnapshot", logName());
598 for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
599 ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
601 if (followerActor != null) {
602 long nextIndex = e.getValue().getNextIndex();
604 if (!context.getReplicatedLog().isPresent(nextIndex) &&
605 context.getReplicatedLog().isInSnapshot(nextIndex)) {
606 sendSnapshotChunk(followerActor, e.getKey());
613 * Sends a snapshot chunk to a given follower
614 * InstallSnapshot should qualify as a heartbeat too.
616 private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
618 if (snapshot.isPresent()) {
619 ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get());
621 // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
622 // followerId to the followerToSnapshot map.
623 FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
626 new InstallSnapshot(currentTerm(), context.getId(),
627 context.getReplicatedLog().getSnapshotIndex(),
628 context.getReplicatedLog().getSnapshotTerm(),
630 followerToSnapshot.incrementChunkIndex(),
631 followerToSnapshot.getTotalChunks(),
632 Optional.of(followerToSnapshot.getLastChunkHashCode())
637 if(LOG.isDebugEnabled()) {
638 LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
639 logName(), followerActor.path(), followerToSnapshot.getChunkIndex(),
640 followerToSnapshot.getTotalChunks());
643 } catch (IOException e) {
644 LOG.error("{}: InstallSnapshot failed for Leader.", logName(), e);
649 * Acccepts snaphot as ByteString, enters into map for future chunks
650 * creates and return a ByteString chunk
652 private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
653 FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
654 if (followerToSnapshot == null) {
655 followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
656 mapFollowerToSnapshot.put(followerId, followerToSnapshot);
658 ByteString nextChunk = followerToSnapshot.getNextChunk();
660 LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size());
665 private void sendHeartBeat() {
666 if (!followerToLog.isEmpty()) {
667 LOG.trace("{}: Sending heartbeat", logName());
668 sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true);
672 private void stopHeartBeat() {
673 if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
674 heartbeatSchedule.cancel();
678 private void scheduleHeartBeat(FiniteDuration interval) {
679 if (followerToLog.isEmpty()) {
680 // Optimization - do not bother scheduling a heartbeat as there are
687 // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
688 // message is sent to itself.
689 // Scheduling the heartbeat only once here because heartbeats do not
690 // need to be sent if there are other messages being sent to the remote
692 heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
693 interval, context.getActor(), new SendHeartBeat(),
694 context.getActorSystem().dispatcher(), context.getActor());
698 public void close() throws Exception {
703 public String getLeaderId() {
704 return context.getId();
707 protected boolean isLeaderIsolated() {
708 int minPresent = minIsolatedLeaderPeerCount;
709 for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
710 if (followerLogInformation.isFollowerActive()) {
712 if (minPresent == 0) {
717 return (minPresent != 0);
721 * Encapsulates the snapshot bytestring and handles the logic of sending
724 protected class FollowerToSnapshot {
725 private final ByteString snapshotBytes;
726 private int offset = 0;
727 // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
728 private int replyReceivedForOffset;
729 // if replyStatus is false, the previous chunk is attempted
730 private boolean replyStatus = false;
731 private int chunkIndex;
732 private final int totalChunks;
733 private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
734 private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
736 public FollowerToSnapshot(ByteString snapshotBytes) {
737 this.snapshotBytes = snapshotBytes;
738 int size = snapshotBytes.size();
739 totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
740 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
741 if(LOG.isDebugEnabled()) {
742 LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
743 logName(), size, totalChunks);
745 replyReceivedForOffset = -1;
746 chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
749 public ByteString getSnapshotBytes() {
750 return snapshotBytes;
753 public int incrementOffset() {
755 // if prev chunk failed, we would want to sent the same chunk again
756 offset = offset + context.getConfigParams().getSnapshotChunkSize();
761 public int incrementChunkIndex() {
763 // if prev chunk failed, we would want to sent the same chunk again
764 chunkIndex = chunkIndex + 1;
769 public int getChunkIndex() {
773 public int getTotalChunks() {
777 public boolean canSendNextChunk() {
778 // we only send a false if a chunk is sent but we have not received a reply yet
779 return replyReceivedForOffset == offset;
782 public boolean isLastChunk(int chunkIndex) {
783 return totalChunks == chunkIndex;
786 public void markSendStatus(boolean success) {
788 // if the chunk sent was successful
789 replyReceivedForOffset = offset;
791 lastChunkHashCode = nextChunkHashCode;
793 // if the chunk sent was failure
794 replyReceivedForOffset = offset;
799 public ByteString getNextChunk() {
800 int snapshotLength = getSnapshotBytes().size();
801 int start = incrementOffset();
802 int size = context.getConfigParams().getSnapshotChunkSize();
803 if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
804 size = snapshotLength;
806 if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
807 size = snapshotLength - start;
812 LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(),
813 snapshotLength, start, size);
815 ByteString substring = getSnapshotBytes().substring(start, start + size);
816 nextChunkHashCode = substring.hashCode();
821 * reset should be called when the Follower needs to be sent the snapshot from the beginning
826 replyReceivedForOffset = offset;
827 chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
828 lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
831 public int getLastChunkHashCode() {
832 return lastChunkHashCode;
836 // called from example-actor for printing the follower-states
837 public String printFollowerStates() {
838 final StringBuilder sb = new StringBuilder();
841 for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
843 sb.append(followerLogInformation.getId());
844 sb.append(" state:");
845 sb.append(followerLogInformation.isFollowerActive());
850 return sb.toString();
854 public FollowerLogInformation getFollower(String followerId) {
855 return followerToLog.get(followerId);
859 protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) {
860 mapFollowerToSnapshot.put(followerId, snapshot);
864 public int followerSnapshotSize() {
865 return mapFollowerToSnapshot.size();
869 public int followerLogSize() {
870 return followerToLog.size();