2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.protobuf.ByteString;
18 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
19 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
20 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
21 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
22 import org.opendaylight.controller.cluster.raft.RaftActorContext;
23 import org.opendaylight.controller.cluster.raft.RaftState;
24 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
25 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
26 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
27 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
28 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
29 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
30 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
31 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
32 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
33 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
35 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
36 import scala.concurrent.duration.FiniteDuration;
38 import java.io.IOException;
39 import java.util.ArrayList;
40 import java.util.Collections;
41 import java.util.HashMap;
42 import java.util.List;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicLong;
49 * The behavior of a RaftActor when it is in the Leader state
53 * <li> Upon election: send initial empty AppendEntries RPCs
54 * (heartbeat) to each server; repeat during idle periods to
55 * prevent election timeouts (§5.2)
56 * <li> If command received from client: append entry to local log,
57 * respond after entry applied to state machine (§5.3)
58 * <li> If last log index ≥ nextIndex for a follower: send
59 * AppendEntries RPC with log entries starting at nextIndex
61 * <li> If successful: update nextIndex and matchIndex for
63 * <li> If AppendEntries fails because of log inconsistency:
64 * decrement nextIndex and retry (§5.3)
66 * <li> If there exists an N such that N > commitIndex, a majority
67 * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
68 * set commitIndex = N (§5.3, §5.4).
70 public class Leader extends AbstractRaftActorBehavior {
73 protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
74 protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
76 private final Set<String> followers;
78 private Cancellable heartbeatSchedule = null;
79 private Cancellable installSnapshotSchedule = null;
81 private List<ClientRequestTracker> trackerList = new ArrayList<>();
83 private final int minReplicationCount;
85 private Optional<ByteString> snapshot;
87 public Leader(RaftActorContext context) {
90 followers = context.getPeerAddresses().keySet();
92 for (String followerId : followers) {
93 FollowerLogInformation followerLogInformation =
94 new FollowerLogInformationImpl(followerId,
95 new AtomicLong(context.getCommitIndex()),
97 context.getConfigParams().getElectionTimeOutInterval());
99 followerToLog.put(followerId, followerLogInformation);
102 if(LOG.isDebugEnabled()) {
103 LOG.debug("Election:Leader has following peers: {}", followers);
106 if (followers.size() > 0) {
107 minReplicationCount = (followers.size() + 1) / 2 + 1;
109 minReplicationCount = 0;
112 snapshot = Optional.absent();
114 // Immediately schedule a heartbeat
115 // Upon election: send initial empty AppendEntries RPCs
116 // (heartbeat) to each server; repeat during idle periods to
117 // prevent election timeouts (§5.2)
118 scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
120 scheduleInstallSnapshotCheck(
121 new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
122 context.getConfigParams().getHeartBeatInterval().unit())
127 private Optional<ByteString> getSnapshot() {
132 void setSnapshot(Optional<ByteString> snapshot) {
133 this.snapshot = snapshot;
136 @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
137 AppendEntries appendEntries) {
139 if(LOG.isDebugEnabled()) {
140 LOG.debug(appendEntries.toString());
146 @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
147 AppendEntriesReply appendEntriesReply) {
149 if(! appendEntriesReply.isSuccess()) {
150 if(LOG.isDebugEnabled()) {
151 LOG.debug(appendEntriesReply.toString());
155 // Update the FollowerLogInformation
156 String followerId = appendEntriesReply.getFollowerId();
157 FollowerLogInformation followerLogInformation =
158 followerToLog.get(followerId);
160 if(followerLogInformation == null){
161 LOG.error("Unknown follower {}", followerId);
165 followerLogInformation.markFollowerActive();
167 if (appendEntriesReply.isSuccess()) {
168 followerLogInformation
169 .setMatchIndex(appendEntriesReply.getLogLastIndex());
170 followerLogInformation
171 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
174 // TODO: When we find that the follower is out of sync with the
175 // Leader we simply decrement that followers next index by 1.
176 // Would it be possible to do better than this? The RAFT spec
177 // does not explicitly deal with it but may be something for us to
180 followerLogInformation.decrNextIndex();
183 // Now figure out if this reply warrants a change in the commitIndex
184 // If there exists an N such that N > commitIndex, a majority
185 // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
186 // set commitIndex = N (§5.3, §5.4).
187 for (long N = context.getCommitIndex() + 1; ; N++) {
188 int replicatedCount = 1;
190 for (FollowerLogInformation info : followerToLog.values()) {
191 if (info.getMatchIndex().get() >= N) {
196 if (replicatedCount >= minReplicationCount) {
197 ReplicatedLogEntry replicatedLogEntry =
198 context.getReplicatedLog().get(N);
199 if (replicatedLogEntry != null
200 && replicatedLogEntry.getTerm()
202 context.setCommitIndex(N);
209 // Apply the change to the state machine
210 if (context.getCommitIndex() > context.getLastApplied()) {
211 applyLogToStateMachine(context.getCommitIndex());
217 protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
219 ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
220 if(toRemove != null) {
221 trackerList.remove(toRemove);
227 protected ClientRequestTracker findClientRequestTracker(long logIndex) {
228 for (ClientRequestTracker tracker : trackerList) {
229 if (tracker.getIndex() == logIndex) {
237 @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
238 RequestVoteReply requestVoteReply) {
242 @Override public RaftState state() {
243 return RaftState.Leader;
246 @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
247 Preconditions.checkNotNull(sender, "sender should not be null");
249 Object message = fromSerializableMessage(originalMessage);
251 if (message instanceof RaftRPC) {
252 RaftRPC rpc = (RaftRPC) message;
253 // If RPC request or response contains term T > currentTerm:
254 // set currentTerm = T, convert to follower (§5.1)
255 // This applies to all RPC messages and responses
256 if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
257 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
259 return switchBehavior(new Follower(context));
264 if (message instanceof SendHeartBeat) {
268 } else if(message instanceof InitiateInstallSnapshot) {
269 installSnapshotIfNeeded();
271 } else if(message instanceof SendInstallSnapshot) {
272 // received from RaftActor
273 setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
274 sendInstallSnapshot();
276 } else if (message instanceof Replicate) {
277 replicate((Replicate) message);
279 } else if (message instanceof InstallSnapshotReply){
280 handleInstallSnapshotReply(
281 (InstallSnapshotReply) message);
284 scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
287 return super.handleMessage(sender, message);
290 private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
291 String followerId = reply.getFollowerId();
292 FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
293 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
294 followerLogInformation.markFollowerActive();
296 if (followerToSnapshot != null &&
297 followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
299 if (reply.isSuccess()) {
300 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
301 //this was the last chunk reply
302 if(LOG.isDebugEnabled()) {
303 LOG.debug("InstallSnapshotReply received, " +
304 "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
305 reply.getChunkIndex(), followerId,
306 context.getReplicatedLog().getSnapshotIndex() + 1
310 followerLogInformation.setMatchIndex(
311 context.getReplicatedLog().getSnapshotIndex());
312 followerLogInformation.setNextIndex(
313 context.getReplicatedLog().getSnapshotIndex() + 1);
314 mapFollowerToSnapshot.remove(followerId);
316 if(LOG.isDebugEnabled()) {
317 LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
318 followerToLog.get(followerId).getNextIndex().get());
321 if (mapFollowerToSnapshot.isEmpty()) {
322 // once there are no pending followers receiving snapshots
323 // we can remove snapshot from the memory
324 setSnapshot(Optional.<ByteString>absent());
328 followerToSnapshot.markSendStatus(true);
331 LOG.info("InstallSnapshotReply received, " +
332 "sending snapshot chunk failed, Will retry, Chunk:{}",
333 reply.getChunkIndex()
335 followerToSnapshot.markSendStatus(false);
339 LOG.error("ERROR!!" +
340 "FollowerId in InstallSnapshotReply not known to Leader" +
341 " or Chunk Index in InstallSnapshotReply not matching {} != {}",
342 followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
347 private void replicate(Replicate replicate) {
348 long logIndex = replicate.getReplicatedLogEntry().getIndex();
350 if(LOG.isDebugEnabled()) {
351 LOG.debug("Replicate message {}", logIndex);
354 // Create a tracker entry we will use this later to notify the
357 new ClientRequestTrackerImpl(replicate.getClientActor(),
358 replicate.getIdentifier(),
362 if (followers.size() == 0) {
363 context.setCommitIndex(logIndex);
364 applyLogToStateMachine(logIndex);
370 private void sendAppendEntries() {
371 // Send an AppendEntries to all followers
372 for (String followerId : followers) {
373 ActorSelection followerActor = context.getPeerActorSelection(followerId);
375 if (followerActor != null) {
376 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
377 long followerNextIndex = followerLogInformation.getNextIndex().get();
378 boolean isFollowerActive = followerLogInformation.isFollowerActive();
379 List<ReplicatedLogEntry> entries = null;
381 if (mapFollowerToSnapshot.get(followerId) != null) {
382 // if install snapshot is in process , then sent next chunk if possible
383 if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
384 sendSnapshotChunk(followerActor, followerId);
386 // we send a heartbeat even if we have not received a reply for the last chunk
387 sendAppendEntriesToFollower(followerActor, followerNextIndex,
388 Collections.<ReplicatedLogEntry>emptyList());
392 long leaderLastIndex = context.getReplicatedLog().lastIndex();
393 long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
395 if (isFollowerActive &&
396 context.getReplicatedLog().isPresent(followerNextIndex)) {
397 // FIXME : Sending one entry at a time
398 entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
400 } else if (isFollowerActive && followerNextIndex >= 0 &&
401 leaderLastIndex >= followerNextIndex ) {
402 // if the followers next index is not present in the leaders log, and
403 // if the follower is just not starting and if leader's index is more than followers index
404 // then snapshot should be sent
406 if(LOG.isDebugEnabled()) {
407 LOG.debug("InitiateInstallSnapshot to follower:{}," +
408 "follower-nextIndex:{}, leader-snapshot-index:{}, " +
409 "leader-last-index:{}", followerId,
410 followerNextIndex, leaderSnapShotIndex, leaderLastIndex
413 actor().tell(new InitiateInstallSnapshot(), actor());
415 // we would want to sent AE as the capture snapshot might take time
416 entries = Collections.<ReplicatedLogEntry>emptyList();
419 //we send an AppendEntries, even if the follower is inactive
420 // in-order to update the followers timestamp, in case it becomes active again
421 entries = Collections.<ReplicatedLogEntry>emptyList();
424 sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
431 private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
432 List<ReplicatedLogEntry> entries) {
434 new AppendEntries(currentTerm(), context.getId(),
435 prevLogIndex(followerNextIndex),
436 prevLogTerm(followerNextIndex), entries,
437 context.getCommitIndex()).toSerializable(),
443 * An installSnapshot is scheduled at a interval that is a multiple of
444 * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
445 * snapshots at every heartbeat.
447 * Install Snapshot works as follows
448 * 1. Leader sends a InitiateInstallSnapshot message to self
449 * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor
450 * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
451 * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
452 * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
453 * 5. On complete, Follower sends back a InstallSnapshotReply.
454 * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
455 * and replenishes the memory by deleting the snapshot in Replicated log.
458 private void installSnapshotIfNeeded() {
459 for (String followerId : followers) {
460 ActorSelection followerActor =
461 context.getPeerActorSelection(followerId);
463 if(followerActor != null) {
464 FollowerLogInformation followerLogInformation =
465 followerToLog.get(followerId);
467 long nextIndex = followerLogInformation.getNextIndex().get();
469 if (!context.getReplicatedLog().isPresent(nextIndex) &&
470 context.getReplicatedLog().isInSnapshot(nextIndex)) {
471 LOG.info("{} follower needs a snapshot install", followerId);
472 if (snapshot.isPresent()) {
473 // if a snapshot is present in the memory, most likely another install is in progress
474 // no need to capture snapshot
475 sendSnapshotChunk(followerActor, followerId);
478 initiateCaptureSnapshot();
479 //we just need 1 follower who would need snapshot to be installed.
480 // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
481 // who needs an install and send to all who need
490 // on every install snapshot, we try to capture the snapshot.
491 // Once a capture is going on, another one issued will get ignored by RaftActor.
492 private void initiateCaptureSnapshot() {
493 LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
494 ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
495 long lastAppliedIndex = -1;
496 long lastAppliedTerm = -1;
498 if (lastAppliedEntry != null) {
499 lastAppliedIndex = lastAppliedEntry.getIndex();
500 lastAppliedTerm = lastAppliedEntry.getTerm();
501 } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
502 lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
503 lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
506 boolean isInstallSnapshotInitiated = true;
507 actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
508 lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
513 private void sendInstallSnapshot() {
514 for (String followerId : followers) {
515 ActorSelection followerActor = context.getPeerActorSelection(followerId);
517 if(followerActor != null) {
518 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
519 long nextIndex = followerLogInformation.getNextIndex().get();
521 if (!context.getReplicatedLog().isPresent(nextIndex) &&
522 context.getReplicatedLog().isInSnapshot(nextIndex)) {
523 sendSnapshotChunk(followerActor, followerId);
530 * Sends a snapshot chunk to a given follower
531 * InstallSnapshot should qualify as a heartbeat too.
533 private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
535 if (snapshot.isPresent()) {
537 new InstallSnapshot(currentTerm(), context.getId(),
538 context.getReplicatedLog().getSnapshotIndex(),
539 context.getReplicatedLog().getSnapshotTerm(),
540 getNextSnapshotChunk(followerId,snapshot.get()),
541 mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
542 mapFollowerToSnapshot.get(followerId).getTotalChunks()
546 LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
547 followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
548 mapFollowerToSnapshot.get(followerId).getTotalChunks());
550 } catch (IOException e) {
551 LOG.error(e, "InstallSnapshot failed for Leader.");
556 * Acccepts snaphot as ByteString, enters into map for future chunks
557 * creates and return a ByteString chunk
559 private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
560 FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
561 if (followerToSnapshot == null) {
562 followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
563 mapFollowerToSnapshot.put(followerId, followerToSnapshot);
565 ByteString nextChunk = followerToSnapshot.getNextChunk();
566 if (LOG.isDebugEnabled()) {
567 LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
572 private void sendHeartBeat() {
573 if (followers.size() > 0) {
578 private void stopHeartBeat() {
579 if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
580 heartbeatSchedule.cancel();
584 private void stopInstallSnapshotSchedule() {
585 if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
586 installSnapshotSchedule.cancel();
590 private void scheduleHeartBeat(FiniteDuration interval) {
591 if(followers.size() == 0){
592 // Optimization - do not bother scheduling a heartbeat as there are
599 // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
600 // message is sent to itself.
601 // Scheduling the heartbeat only once here because heartbeats do not
602 // need to be sent if there are other messages being sent to the remote
604 heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
605 interval, context.getActor(), new SendHeartBeat(),
606 context.getActorSystem().dispatcher(), context.getActor());
609 private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
610 if(followers.size() == 0){
611 // Optimization - do not bother scheduling a heartbeat as there are
616 stopInstallSnapshotSchedule();
618 // Schedule a message to send append entries to followers that can
619 // accept an append entries with some data in it
620 installSnapshotSchedule =
621 context.getActorSystem().scheduler().scheduleOnce(
623 context.getActor(), new InitiateInstallSnapshot(),
624 context.getActorSystem().dispatcher(), context.getActor());
629 @Override public void close() throws Exception {
633 @Override public String getLeaderId() {
634 return context.getId();
638 * Encapsulates the snapshot bytestring and handles the logic of sending
641 protected class FollowerToSnapshot {
642 private ByteString snapshotBytes;
643 private int offset = 0;
644 // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
645 private int replyReceivedForOffset;
646 // if replyStatus is false, the previous chunk is attempted
647 private boolean replyStatus = false;
648 private int chunkIndex;
649 private int totalChunks;
651 public FollowerToSnapshot(ByteString snapshotBytes) {
652 this.snapshotBytes = snapshotBytes;
653 replyReceivedForOffset = -1;
655 int size = snapshotBytes.size();
656 totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
657 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
658 if(LOG.isDebugEnabled()) {
659 LOG.debug("Snapshot {} bytes, total chunks to send:{}",
664 public ByteString getSnapshotBytes() {
665 return snapshotBytes;
668 public int incrementOffset() {
670 // if prev chunk failed, we would want to sent the same chunk again
671 offset = offset + context.getConfigParams().getSnapshotChunkSize();
676 public int incrementChunkIndex() {
678 // if prev chunk failed, we would want to sent the same chunk again
679 chunkIndex = chunkIndex + 1;
684 public int getChunkIndex() {
688 public int getTotalChunks() {
692 public boolean canSendNextChunk() {
693 // we only send a false if a chunk is sent but we have not received a reply yet
694 return replyReceivedForOffset == offset;
697 public boolean isLastChunk(int chunkIndex) {
698 return totalChunks == chunkIndex;
701 public void markSendStatus(boolean success) {
703 // if the chunk sent was successful
704 replyReceivedForOffset = offset;
707 // if the chunk sent was failure
708 replyReceivedForOffset = offset;
713 public ByteString getNextChunk() {
714 int snapshotLength = getSnapshotBytes().size();
715 int start = incrementOffset();
716 int size = context.getConfigParams().getSnapshotChunkSize();
717 if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
718 size = snapshotLength;
720 if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
721 size = snapshotLength - start;
725 if(LOG.isDebugEnabled()) {
726 LOG.debug("length={}, offset={},size={}",
727 snapshotLength, start, size);
729 return getSnapshotBytes().substring(start, start + size);
734 // called from example-actor for printing the follower-states
735 public String printFollowerStates() {
736 StringBuilder sb = new StringBuilder();
737 for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
738 boolean isFollowerActive = followerLogInformation.isFollowerActive();
739 sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
742 return "[" + sb.toString() + "]";
746 void markFollowerActive(String followerId) {
747 followerToLog.get(followerId).markFollowerActive();