2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.base.Preconditions;
15 import com.google.protobuf.ByteString;
16 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
17 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
19 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
20 import org.opendaylight.controller.cluster.raft.RaftActorContext;
21 import org.opendaylight.controller.cluster.raft.RaftState;
22 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
23 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
24 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
25 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
29 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
30 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
31 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
32 import scala.concurrent.duration.FiniteDuration;
34 import java.io.IOException;
35 import java.util.ArrayList;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicLong;
45 * The behavior of a RaftActor when it is in the Leader state
49 * <li> Upon election: send initial empty AppendEntries RPCs
50 * (heartbeat) to each server; repeat during idle periods to
51 * prevent election timeouts (§5.2)
52 * <li> If command received from client: append entry to local log,
53 * respond after entry applied to state machine (§5.3)
54 * <li> If last log index ≥ nextIndex for a follower: send
55 * AppendEntries RPC with log entries starting at nextIndex
57 * <li> If successful: update nextIndex and matchIndex for
59 * <li> If AppendEntries fails because of log inconsistency:
60 * decrement nextIndex and retry (§5.3)
62 * <li> If there exists an N such that N > commitIndex, a majority
63 * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
64 * set commitIndex = N (§5.3, §5.4).
66 public class Leader extends AbstractRaftActorBehavior {
69 protected final Map<String, FollowerLogInformation> followerToLog =
71 protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
73 private final Set<String> followers;
75 private Cancellable heartbeatSchedule = null;
76 private Cancellable appendEntriesSchedule = null;
77 private Cancellable installSnapshotSchedule = null;
79 private List<ClientRequestTracker> trackerList = new ArrayList<>();
81 private final int minReplicationCount;
83 public Leader(RaftActorContext context) {
86 followers = context.getPeerAddresses().keySet();
88 for (String followerId : followers) {
89 FollowerLogInformation followerLogInformation =
90 new FollowerLogInformationImpl(followerId,
91 new AtomicLong(context.getCommitIndex()),
94 followerToLog.put(followerId, followerLogInformation);
97 if(LOG.isDebugEnabled()) {
98 LOG.debug("Election:Leader has following peers: {}", followers);
101 if (followers.size() > 0) {
102 minReplicationCount = (followers.size() + 1) / 2 + 1;
104 minReplicationCount = 0;
108 // Immediately schedule a heartbeat
109 // Upon election: send initial empty AppendEntries RPCs
110 // (heartbeat) to each server; repeat during idle periods to
111 // prevent election timeouts (§5.2)
112 scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
114 scheduleInstallSnapshotCheck(
115 new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
116 context.getConfigParams().getHeartBeatInterval().unit())
121 @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
122 AppendEntries appendEntries) {
124 if(LOG.isDebugEnabled()) {
125 LOG.debug(appendEntries.toString());
131 @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
132 AppendEntriesReply appendEntriesReply) {
134 if(! appendEntriesReply.isSuccess()) {
135 if(LOG.isDebugEnabled()) {
136 LOG.debug(appendEntriesReply.toString());
140 // Update the FollowerLogInformation
141 String followerId = appendEntriesReply.getFollowerId();
142 FollowerLogInformation followerLogInformation =
143 followerToLog.get(followerId);
145 if(followerLogInformation == null){
146 LOG.error("Unknown follower {}", followerId);
150 if (appendEntriesReply.isSuccess()) {
151 followerLogInformation
152 .setMatchIndex(appendEntriesReply.getLogLastIndex());
153 followerLogInformation
154 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
157 // TODO: When we find that the follower is out of sync with the
158 // Leader we simply decrement that followers next index by 1.
159 // Would it be possible to do better than this? The RAFT spec
160 // does not explicitly deal with it but may be something for us to
163 followerLogInformation.decrNextIndex();
166 // Now figure out if this reply warrants a change in the commitIndex
167 // If there exists an N such that N > commitIndex, a majority
168 // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
169 // set commitIndex = N (§5.3, §5.4).
170 for (long N = context.getCommitIndex() + 1; ; N++) {
171 int replicatedCount = 1;
173 for (FollowerLogInformation info : followerToLog.values()) {
174 if (info.getMatchIndex().get() >= N) {
179 if (replicatedCount >= minReplicationCount) {
180 ReplicatedLogEntry replicatedLogEntry =
181 context.getReplicatedLog().get(N);
182 if (replicatedLogEntry != null
183 && replicatedLogEntry.getTerm()
185 context.setCommitIndex(N);
192 // Apply the change to the state machine
193 if (context.getCommitIndex() > context.getLastApplied()) {
194 applyLogToStateMachine(context.getCommitIndex());
200 protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
202 ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
203 if(toRemove != null) {
204 trackerList.remove(toRemove);
210 protected ClientRequestTracker findClientRequestTracker(long logIndex) {
211 for (ClientRequestTracker tracker : trackerList) {
212 if (tracker.getIndex() == logIndex) {
220 @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
221 RequestVoteReply requestVoteReply) {
225 @Override public RaftState state() {
226 return RaftState.Leader;
229 @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
230 Preconditions.checkNotNull(sender, "sender should not be null");
232 Object message = fromSerializableMessage(originalMessage);
234 if (message instanceof RaftRPC) {
235 RaftRPC rpc = (RaftRPC) message;
236 // If RPC request or response contains term T > currentTerm:
237 // set currentTerm = T, convert to follower (§5.1)
238 // This applies to all RPC messages and responses
239 if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
240 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
242 return switchBehavior(new Follower(context));
247 if (message instanceof SendHeartBeat) {
250 } else if(message instanceof SendInstallSnapshot) {
251 installSnapshotIfNeeded();
252 } else if (message instanceof Replicate) {
253 replicate((Replicate) message);
254 } else if (message instanceof InstallSnapshotReply){
255 handleInstallSnapshotReply(
256 (InstallSnapshotReply) message);
259 scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
262 return super.handleMessage(sender, message);
265 private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
266 String followerId = reply.getFollowerId();
267 FollowerToSnapshot followerToSnapshot =
268 mapFollowerToSnapshot.get(followerId);
270 if (followerToSnapshot != null &&
271 followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
273 if (reply.isSuccess()) {
274 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
275 //this was the last chunk reply
276 if(LOG.isDebugEnabled()) {
277 LOG.debug("InstallSnapshotReply received, " +
278 "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
279 reply.getChunkIndex(), followerId,
280 context.getReplicatedLog().getSnapshotIndex() + 1
284 FollowerLogInformation followerLogInformation =
285 followerToLog.get(followerId);
286 followerLogInformation.setMatchIndex(
287 context.getReplicatedLog().getSnapshotIndex());
288 followerLogInformation.setNextIndex(
289 context.getReplicatedLog().getSnapshotIndex() + 1);
290 mapFollowerToSnapshot.remove(followerId);
292 if(LOG.isDebugEnabled()) {
293 LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
294 followerToLog.get(followerId).getNextIndex().get());
298 followerToSnapshot.markSendStatus(true);
301 LOG.info("InstallSnapshotReply received, " +
302 "sending snapshot chunk failed, Will retry, Chunk:{}",
303 reply.getChunkIndex()
305 followerToSnapshot.markSendStatus(false);
309 LOG.error("ERROR!!" +
310 "FollowerId in InstallSnapshotReply not known to Leader" +
311 " or Chunk Index in InstallSnapshotReply not matching {} != {}",
312 followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
317 private void replicate(Replicate replicate) {
318 long logIndex = replicate.getReplicatedLogEntry().getIndex();
320 if(LOG.isDebugEnabled()) {
321 LOG.debug("Replicate message {}", logIndex);
324 // Create a tracker entry we will use this later to notify the
327 new ClientRequestTrackerImpl(replicate.getClientActor(),
328 replicate.getIdentifier(),
332 if (followers.size() == 0) {
333 context.setCommitIndex(logIndex);
334 applyLogToStateMachine(logIndex);
340 private void sendAppendEntries() {
341 // Send an AppendEntries to all followers
342 for (String followerId : followers) {
343 ActorSelection followerActor = context.getPeerActorSelection(followerId);
345 if (followerActor != null) {
346 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
347 long followerNextIndex = followerLogInformation.getNextIndex().get();
348 List<ReplicatedLogEntry> entries = Collections.emptyList();
350 if (mapFollowerToSnapshot.get(followerId) != null) {
351 if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
352 sendSnapshotChunk(followerActor, followerId);
357 if (context.getReplicatedLog().isPresent(followerNextIndex)) {
358 // FIXME : Sending one entry at a time
359 entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
362 new AppendEntries(currentTerm(), context.getId(),
363 prevLogIndex(followerNextIndex),
364 prevLogTerm(followerNextIndex), entries,
365 context.getCommitIndex()).toSerializable(),
370 // if the followers next index is not present in the leaders log, then snapshot should be sent
371 long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
372 long leaderLastIndex = context.getReplicatedLog().lastIndex();
373 if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
374 // if the follower is just not starting and leader's index
375 // is more than followers index
376 if(LOG.isDebugEnabled()) {
377 LOG.debug("SendInstallSnapshot to follower:{}," +
378 "follower-nextIndex:{}, leader-snapshot-index:{}, " +
379 "leader-last-index:{}", followerId,
380 followerNextIndex, leaderSnapShotIndex, leaderLastIndex
384 actor().tell(new SendInstallSnapshot(), actor());
387 new AppendEntries(currentTerm(), context.getId(),
388 prevLogIndex(followerNextIndex),
389 prevLogTerm(followerNextIndex), entries,
390 context.getCommitIndex()).toSerializable(),
401 * An installSnapshot is scheduled at a interval that is a multiple of
402 * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
403 * snapshots at every heartbeat.
405 private void installSnapshotIfNeeded(){
406 for (String followerId : followers) {
407 ActorSelection followerActor =
408 context.getPeerActorSelection(followerId);
410 if(followerActor != null) {
411 FollowerLogInformation followerLogInformation =
412 followerToLog.get(followerId);
414 long nextIndex = followerLogInformation.getNextIndex().get();
416 if (!context.getReplicatedLog().isPresent(nextIndex) &&
417 context.getReplicatedLog().isInSnapshot(nextIndex)) {
418 sendSnapshotChunk(followerActor, followerId);
425 * Sends a snapshot chunk to a given follower
426 * InstallSnapshot should qualify as a heartbeat too.
428 private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
431 new InstallSnapshot(currentTerm(), context.getId(),
432 context.getReplicatedLog().getSnapshotIndex(),
433 context.getReplicatedLog().getSnapshotTerm(),
434 getNextSnapshotChunk(followerId,
435 context.getReplicatedLog().getSnapshot()),
436 mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
437 mapFollowerToSnapshot.get(followerId).getTotalChunks()
441 LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
442 followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
443 mapFollowerToSnapshot.get(followerId).getTotalChunks());
444 } catch (IOException e) {
445 LOG.error(e, "InstallSnapshot failed for Leader.");
450 * Acccepts snaphot as ByteString, enters into map for future chunks
451 * creates and return a ByteString chunk
453 private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
454 FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
455 if (followerToSnapshot == null) {
456 followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
457 mapFollowerToSnapshot.put(followerId, followerToSnapshot);
459 ByteString nextChunk = followerToSnapshot.getNextChunk();
460 if(LOG.isDebugEnabled()) {
461 LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
467 private void sendHeartBeat() {
468 if (followers.size() > 0) {
473 private void stopHeartBeat() {
474 if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
475 heartbeatSchedule.cancel();
479 private void stopInstallSnapshotSchedule() {
480 if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
481 installSnapshotSchedule.cancel();
485 private void scheduleHeartBeat(FiniteDuration interval) {
486 if(followers.size() == 0){
487 // Optimization - do not bother scheduling a heartbeat as there are
494 // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
495 // message is sent to itself.
496 // Scheduling the heartbeat only once here because heartbeats do not
497 // need to be sent if there are other messages being sent to the remote
500 context.getActorSystem().scheduler().scheduleOnce(
502 context.getActor(), new SendHeartBeat(),
503 context.getActorSystem().dispatcher(), context.getActor());
507 private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
508 if(followers.size() == 0){
509 // Optimization - do not bother scheduling a heartbeat as there are
514 stopInstallSnapshotSchedule();
516 // Schedule a message to send append entries to followers that can
517 // accept an append entries with some data in it
518 installSnapshotSchedule =
519 context.getActorSystem().scheduler().scheduleOnce(
521 context.getActor(), new SendInstallSnapshot(),
522 context.getActorSystem().dispatcher(), context.getActor());
527 @Override public void close() throws Exception {
531 @Override public String getLeaderId() {
532 return context.getId();
536 * Encapsulates the snapshot bytestring and handles the logic of sending
539 protected class FollowerToSnapshot {
540 private ByteString snapshotBytes;
541 private int offset = 0;
542 // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
543 private int replyReceivedForOffset;
544 // if replyStatus is false, the previous chunk is attempted
545 private boolean replyStatus = false;
546 private int chunkIndex;
547 private int totalChunks;
549 public FollowerToSnapshot(ByteString snapshotBytes) {
550 this.snapshotBytes = snapshotBytes;
551 replyReceivedForOffset = -1;
553 int size = snapshotBytes.size();
554 totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
555 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
556 if(LOG.isDebugEnabled()) {
557 LOG.debug("Snapshot {} bytes, total chunks to send:{}",
562 public ByteString getSnapshotBytes() {
563 return snapshotBytes;
566 public int incrementOffset() {
568 // if prev chunk failed, we would want to sent the same chunk again
569 offset = offset + context.getConfigParams().getSnapshotChunkSize();
574 public int incrementChunkIndex() {
576 // if prev chunk failed, we would want to sent the same chunk again
577 chunkIndex = chunkIndex + 1;
582 public int getChunkIndex() {
586 public int getTotalChunks() {
590 public boolean canSendNextChunk() {
591 // we only send a false if a chunk is sent but we have not received a reply yet
592 return replyReceivedForOffset == offset;
595 public boolean isLastChunk(int chunkIndex) {
596 return totalChunks == chunkIndex;
599 public void markSendStatus(boolean success) {
601 // if the chunk sent was successful
602 replyReceivedForOffset = offset;
605 // if the chunk sent was failure
606 replyReceivedForOffset = offset;
611 public ByteString getNextChunk() {
612 int snapshotLength = getSnapshotBytes().size();
613 int start = incrementOffset();
614 int size = context.getConfigParams().getSnapshotChunkSize();
615 if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
616 size = snapshotLength;
618 if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
619 size = snapshotLength - start;
623 if(LOG.isDebugEnabled()) {
624 LOG.debug("length={}, offset={},size={}",
625 snapshotLength, start, size);
627 return getSnapshotBytes().substring(start, start + size);