2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import akka.event.LoggingAdapter;
15 import com.google.common.base.Preconditions;
16 import com.google.protobuf.ByteString;
17 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
18 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
19 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
20 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
21 import org.opendaylight.controller.cluster.raft.RaftActorContext;
22 import org.opendaylight.controller.cluster.raft.RaftState;
23 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
24 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
25 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
26 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
28 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
29 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
30 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
31 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
32 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
33 import scala.concurrent.duration.FiniteDuration;
35 import java.io.IOException;
36 import java.util.ArrayList;
37 import java.util.Collections;
38 import java.util.HashMap;
39 import java.util.List;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicLong;
46 * The behavior of a RaftActor when it is in the Leader state
50 * <li> Upon election: send initial empty AppendEntries RPCs
51 * (heartbeat) to each server; repeat during idle periods to
52 * prevent election timeouts (§5.2)
53 * <li> If command received from client: append entry to local log,
54 * respond after entry applied to state machine (§5.3)
55 * <li> If last log index ≥ nextIndex for a follower: send
56 * AppendEntries RPC with log entries starting at nextIndex
58 * <li> If successful: update nextIndex and matchIndex for
60 * <li> If AppendEntries fails because of log inconsistency:
61 * decrement nextIndex and retry (§5.3)
63 * <li> If there exists an N such that N > commitIndex, a majority
64 * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
65 * set commitIndex = N (§5.3, §5.4).
67 public class Leader extends AbstractRaftActorBehavior {
70 protected final Map<String, FollowerLogInformation> followerToLog =
72 protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
74 private final Set<String> followers;
76 private Cancellable heartbeatSchedule = null;
77 private Cancellable appendEntriesSchedule = null;
78 private Cancellable installSnapshotSchedule = null;
80 private List<ClientRequestTracker> trackerList = new ArrayList<>();
82 private final int minReplicationCount;
84 private final LoggingAdapter LOG;
86 public Leader(RaftActorContext context) {
89 LOG = context.getLogger();
91 if (lastIndex() >= 0) {
92 context.setCommitIndex(lastIndex());
95 followers = context.getPeerAddresses().keySet();
97 for (String followerId : followers) {
98 FollowerLogInformation followerLogInformation =
99 new FollowerLogInformationImpl(followerId,
100 new AtomicLong(lastIndex()),
103 followerToLog.put(followerId, followerLogInformation);
106 if(LOG.isDebugEnabled()) {
107 LOG.debug("Election:Leader has following peers:" + followers);
110 if (followers.size() > 0) {
111 minReplicationCount = (followers.size() + 1) / 2 + 1;
113 minReplicationCount = 0;
117 // Immediately schedule a heartbeat
118 // Upon election: send initial empty AppendEntries RPCs
119 // (heartbeat) to each server; repeat during idle periods to
120 // prevent election timeouts (§5.2)
121 scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
123 scheduleInstallSnapshotCheck(
124 new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
125 context.getConfigParams().getHeartBeatInterval().unit())
130 @Override protected RaftState handleAppendEntries(ActorRef sender,
131 AppendEntries appendEntries) {
133 if(LOG.isDebugEnabled()) {
134 LOG.debug(appendEntries.toString());
140 @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
141 AppendEntriesReply appendEntriesReply) {
143 if(! appendEntriesReply.isSuccess()) {
144 if(LOG.isDebugEnabled()) {
145 LOG.debug(appendEntriesReply.toString());
149 // Update the FollowerLogInformation
150 String followerId = appendEntriesReply.getFollowerId();
151 FollowerLogInformation followerLogInformation =
152 followerToLog.get(followerId);
154 if(followerLogInformation == null){
155 LOG.error("Unknown follower {}", followerId);
159 if (appendEntriesReply.isSuccess()) {
160 followerLogInformation
161 .setMatchIndex(appendEntriesReply.getLogLastIndex());
162 followerLogInformation
163 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
166 // TODO: When we find that the follower is out of sync with the
167 // Leader we simply decrement that followers next index by 1.
168 // Would it be possible to do better than this? The RAFT spec
169 // does not explicitly deal with it but may be something for us to
172 followerLogInformation.decrNextIndex();
175 // Now figure out if this reply warrants a change in the commitIndex
176 // If there exists an N such that N > commitIndex, a majority
177 // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
178 // set commitIndex = N (§5.3, §5.4).
179 for (long N = context.getCommitIndex() + 1; ; N++) {
180 int replicatedCount = 1;
182 for (FollowerLogInformation info : followerToLog.values()) {
183 if (info.getMatchIndex().get() >= N) {
188 if (replicatedCount >= minReplicationCount) {
189 ReplicatedLogEntry replicatedLogEntry =
190 context.getReplicatedLog().get(N);
191 if (replicatedLogEntry != null
192 && replicatedLogEntry.getTerm()
194 context.setCommitIndex(N);
201 // Apply the change to the state machine
202 if (context.getCommitIndex() > context.getLastApplied()) {
203 applyLogToStateMachine(context.getCommitIndex());
209 protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
211 ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
212 if(toRemove != null) {
213 trackerList.remove(toRemove);
219 protected ClientRequestTracker findClientRequestTracker(long logIndex) {
220 for (ClientRequestTracker tracker : trackerList) {
221 if (tracker.getIndex() == logIndex) {
229 @Override protected RaftState handleRequestVoteReply(ActorRef sender,
230 RequestVoteReply requestVoteReply) {
234 @Override public RaftState state() {
235 return RaftState.Leader;
238 @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
239 Preconditions.checkNotNull(sender, "sender should not be null");
241 Object message = fromSerializableMessage(originalMessage);
243 if (message instanceof RaftRPC) {
244 RaftRPC rpc = (RaftRPC) message;
245 // If RPC request or response contains term T > currentTerm:
246 // set currentTerm = T, convert to follower (§5.1)
247 // This applies to all RPC messages and responses
248 if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
249 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
250 return RaftState.Follower;
255 if (message instanceof SendHeartBeat) {
256 return sendHeartBeat();
257 } else if(message instanceof SendInstallSnapshot) {
258 installSnapshotIfNeeded();
259 } else if (message instanceof Replicate) {
260 replicate((Replicate) message);
261 } else if (message instanceof InstallSnapshotReply){
262 handleInstallSnapshotReply(
263 (InstallSnapshotReply) message);
266 scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
269 return super.handleMessage(sender, message);
272 private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
273 String followerId = reply.getFollowerId();
274 FollowerToSnapshot followerToSnapshot =
275 mapFollowerToSnapshot.get(followerId);
277 if (followerToSnapshot != null &&
278 followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
280 if (reply.isSuccess()) {
281 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
282 //this was the last chunk reply
283 if(LOG.isDebugEnabled()) {
284 LOG.debug("InstallSnapshotReply received, " +
285 "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
286 reply.getChunkIndex(), followerId,
287 context.getReplicatedLog().getSnapshotIndex() + 1
291 FollowerLogInformation followerLogInformation =
292 followerToLog.get(followerId);
293 followerLogInformation.setMatchIndex(
294 context.getReplicatedLog().getSnapshotIndex());
295 followerLogInformation.setNextIndex(
296 context.getReplicatedLog().getSnapshotIndex() + 1);
297 mapFollowerToSnapshot.remove(followerId);
299 if(LOG.isDebugEnabled()) {
300 LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
301 followerToLog.get(followerId).getNextIndex().get());
305 followerToSnapshot.markSendStatus(true);
308 LOG.info("InstallSnapshotReply received, " +
309 "sending snapshot chunk failed, Will retry, Chunk:{}",
310 reply.getChunkIndex()
312 followerToSnapshot.markSendStatus(false);
316 LOG.error("ERROR!!" +
317 "FollowerId in InstallSnapshotReply not known to Leader" +
318 " or Chunk Index in InstallSnapshotReply not matching {} != {}",
319 followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
324 private void replicate(Replicate replicate) {
325 long logIndex = replicate.getReplicatedLogEntry().getIndex();
327 if(LOG.isDebugEnabled()) {
328 LOG.debug("Replicate message " + logIndex);
331 // Create a tracker entry we will use this later to notify the
334 new ClientRequestTrackerImpl(replicate.getClientActor(),
335 replicate.getIdentifier(),
339 if (followers.size() == 0) {
340 context.setCommitIndex(logIndex);
341 applyLogToStateMachine(logIndex);
347 private void sendAppendEntries() {
348 // Send an AppendEntries to all followers
349 for (String followerId : followers) {
350 ActorSelection followerActor = context.getPeerActorSelection(followerId);
352 if (followerActor != null) {
353 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
354 long followerNextIndex = followerLogInformation.getNextIndex().get();
355 List<ReplicatedLogEntry> entries = Collections.emptyList();
357 if (mapFollowerToSnapshot.get(followerId) != null) {
358 if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
359 sendSnapshotChunk(followerActor, followerId);
364 if (context.getReplicatedLog().isPresent(followerNextIndex)) {
365 // FIXME : Sending one entry at a time
366 entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
369 new AppendEntries(currentTerm(), context.getId(),
370 prevLogIndex(followerNextIndex),
371 prevLogTerm(followerNextIndex), entries,
372 context.getCommitIndex()).toSerializable(),
377 // if the followers next index is not present in the leaders log, then snapshot should be sent
378 long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
379 long leaderLastIndex = context.getReplicatedLog().lastIndex();
380 if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
381 // if the follower is just not starting and leader's index
382 // is more than followers index
383 if(LOG.isDebugEnabled()) {
384 LOG.debug("SendInstallSnapshot to follower:{}," +
385 "follower-nextIndex:{}, leader-snapshot-index:{}, " +
386 "leader-last-index:{}", followerId,
387 followerNextIndex, leaderSnapShotIndex, leaderLastIndex
391 actor().tell(new SendInstallSnapshot(), actor());
394 new AppendEntries(currentTerm(), context.getId(),
395 prevLogIndex(followerNextIndex),
396 prevLogTerm(followerNextIndex), entries,
397 context.getCommitIndex()).toSerializable(),
408 * An installSnapshot is scheduled at a interval that is a multiple of
409 * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
410 * snapshots at every heartbeat.
412 private void installSnapshotIfNeeded(){
413 for (String followerId : followers) {
414 ActorSelection followerActor =
415 context.getPeerActorSelection(followerId);
417 if(followerActor != null) {
418 FollowerLogInformation followerLogInformation =
419 followerToLog.get(followerId);
421 long nextIndex = followerLogInformation.getNextIndex().get();
423 if (!context.getReplicatedLog().isPresent(nextIndex) &&
424 context.getReplicatedLog().isInSnapshot(nextIndex)) {
425 sendSnapshotChunk(followerActor, followerId);
432 * Sends a snapshot chunk to a given follower
433 * InstallSnapshot should qualify as a heartbeat too.
435 private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
438 new InstallSnapshot(currentTerm(), context.getId(),
439 context.getReplicatedLog().getSnapshotIndex(),
440 context.getReplicatedLog().getSnapshotTerm(),
441 getNextSnapshotChunk(followerId,
442 context.getReplicatedLog().getSnapshot()),
443 mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
444 mapFollowerToSnapshot.get(followerId).getTotalChunks()
448 LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
449 followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
450 mapFollowerToSnapshot.get(followerId).getTotalChunks());
451 } catch (IOException e) {
452 LOG.error("InstallSnapshot failed for Leader.", e);
457 * Acccepts snaphot as ByteString, enters into map for future chunks
458 * creates and return a ByteString chunk
460 private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
461 FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
462 if (followerToSnapshot == null) {
463 followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
464 mapFollowerToSnapshot.put(followerId, followerToSnapshot);
466 ByteString nextChunk = followerToSnapshot.getNextChunk();
467 if(LOG.isDebugEnabled()) {
468 LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
474 private RaftState sendHeartBeat() {
475 if (followers.size() > 0) {
481 private void stopHeartBeat() {
482 if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
483 heartbeatSchedule.cancel();
487 private void stopInstallSnapshotSchedule() {
488 if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
489 installSnapshotSchedule.cancel();
493 private void scheduleHeartBeat(FiniteDuration interval) {
494 if(followers.size() == 0){
495 // Optimization - do not bother scheduling a heartbeat as there are
502 // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
503 // message is sent to itself.
504 // Scheduling the heartbeat only once here because heartbeats do not
505 // need to be sent if there are other messages being sent to the remote
508 context.getActorSystem().scheduler().scheduleOnce(
510 context.getActor(), new SendHeartBeat(),
511 context.getActorSystem().dispatcher(), context.getActor());
515 private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
516 if(followers.size() == 0){
517 // Optimization - do not bother scheduling a heartbeat as there are
522 stopInstallSnapshotSchedule();
524 // Schedule a message to send append entries to followers that can
525 // accept an append entries with some data in it
526 installSnapshotSchedule =
527 context.getActorSystem().scheduler().scheduleOnce(
529 context.getActor(), new SendInstallSnapshot(),
530 context.getActorSystem().dispatcher(), context.getActor());
535 @Override public void close() throws Exception {
539 @Override public String getLeaderId() {
540 return context.getId();
544 * Encapsulates the snapshot bytestring and handles the logic of sending
547 protected class FollowerToSnapshot {
548 private ByteString snapshotBytes;
549 private int offset = 0;
550 // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
551 private int replyReceivedForOffset;
552 // if replyStatus is false, the previous chunk is attempted
553 private boolean replyStatus = false;
554 private int chunkIndex;
555 private int totalChunks;
557 public FollowerToSnapshot(ByteString snapshotBytes) {
558 this.snapshotBytes = snapshotBytes;
559 replyReceivedForOffset = -1;
561 int size = snapshotBytes.size();
562 totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
563 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
564 if(LOG.isDebugEnabled()) {
565 LOG.debug("Snapshot {} bytes, total chunks to send:{}",
570 public ByteString getSnapshotBytes() {
571 return snapshotBytes;
574 public int incrementOffset() {
576 // if prev chunk failed, we would want to sent the same chunk again
577 offset = offset + context.getConfigParams().getSnapshotChunkSize();
582 public int incrementChunkIndex() {
584 // if prev chunk failed, we would want to sent the same chunk again
585 chunkIndex = chunkIndex + 1;
590 public int getChunkIndex() {
594 public int getTotalChunks() {
598 public boolean canSendNextChunk() {
599 // we only send a false if a chunk is sent but we have not received a reply yet
600 return replyReceivedForOffset == offset;
603 public boolean isLastChunk(int chunkIndex) {
604 return totalChunks == chunkIndex;
607 public void markSendStatus(boolean success) {
609 // if the chunk sent was successful
610 replyReceivedForOffset = offset;
613 // if the chunk sent was failure
614 replyReceivedForOffset = offset;
619 public ByteString getNextChunk() {
620 int snapshotLength = getSnapshotBytes().size();
621 int start = incrementOffset();
622 int size = context.getConfigParams().getSnapshotChunkSize();
623 if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
624 size = snapshotLength;
626 if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
627 size = snapshotLength - start;
631 if(LOG.isDebugEnabled()) {
632 LOG.debug("length={}, offset={},size={}",
633 snapshotLength, start, size);
635 return getSnapshotBytes().substring(start, start + size);