2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
16 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
17 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
19 import org.opendaylight.controller.cluster.raft.RaftActorContext;
20 import org.opendaylight.controller.cluster.raft.RaftState;
21 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
22 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
23 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
24 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
25 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
29 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
30 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
31 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
32 import scala.concurrent.duration.FiniteDuration;
34 import java.util.ArrayList;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.List;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicLong;
44 * The behavior of a RaftActor when it is in the Leader state
48 * <li> Upon election: send initial empty AppendEntries RPCs
49 * (heartbeat) to each server; repeat during idle periods to
50 * prevent election timeouts (§5.2)
51 * <li> If command received from client: append entry to local log,
52 * respond after entry applied to state machine (§5.3)
53 * <li> If last log index ≥ nextIndex for a follower: send
54 * AppendEntries RPC with log entries starting at nextIndex
56 * <li> If successful: update nextIndex and matchIndex for
58 * <li> If AppendEntries fails because of log inconsistency:
59 * decrement nextIndex and retry (§5.3)
61 * <li> If there exists an N such that N > commitIndex, a majority
62 * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
63 * set commitIndex = N (§5.3, §5.4).
65 public class Leader extends AbstractRaftActorBehavior {
68 private final Map<String, FollowerLogInformation> followerToLog =
71 private final Set<String> followers;
73 private Cancellable heartbeatSchedule = null;
74 private Cancellable appendEntriesSchedule = null;
75 private Cancellable installSnapshotSchedule = null;
77 private List<ClientRequestTracker> trackerList = new ArrayList<>();
79 private final int minReplicationCount;
81 public Leader(RaftActorContext context) {
84 if (lastIndex() >= 0) {
85 context.setCommitIndex(lastIndex());
88 followers = context.getPeerAddresses().keySet();
90 for (String followerId : followers) {
91 FollowerLogInformation followerLogInformation =
92 new FollowerLogInformationImpl(followerId,
93 new AtomicLong(lastIndex()),
96 followerToLog.put(followerId, followerLogInformation);
99 context.getLogger().debug("Election:Leader has following peers:"+ followers);
101 if (followers.size() > 0) {
102 minReplicationCount = (followers.size() + 1) / 2 + 1;
104 minReplicationCount = 0;
108 // Immediately schedule a heartbeat
109 // Upon election: send initial empty AppendEntries RPCs
110 // (heartbeat) to each server; repeat during idle periods to
111 // prevent election timeouts (§5.2)
112 scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
114 scheduleInstallSnapshotCheck(
115 new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
116 context.getConfigParams().getHeartBeatInterval().unit())
121 @Override protected RaftState handleAppendEntries(ActorRef sender,
122 AppendEntries appendEntries) {
124 context.getLogger().info("Leader: Received {}", appendEntries.toString());
129 @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
130 AppendEntriesReply appendEntriesReply) {
132 if(! appendEntriesReply.isSuccess()) {
134 .info("Leader: Received {}", appendEntriesReply.toString());
137 // Update the FollowerLogInformation
138 String followerId = appendEntriesReply.getFollowerId();
139 FollowerLogInformation followerLogInformation =
140 followerToLog.get(followerId);
142 if(followerLogInformation == null){
143 context.getLogger().error("Unknown follower {}", followerId);
147 if (appendEntriesReply.isSuccess()) {
148 followerLogInformation
149 .setMatchIndex(appendEntriesReply.getLogLastIndex());
150 followerLogInformation
151 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
154 // TODO: When we find that the follower is out of sync with the
155 // Leader we simply decrement that followers next index by 1.
156 // Would it be possible to do better than this? The RAFT spec
157 // does not explicitly deal with it but may be something for us to
160 followerLogInformation.decrNextIndex();
163 // Now figure out if this reply warrants a change in the commitIndex
164 // If there exists an N such that N > commitIndex, a majority
165 // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
166 // set commitIndex = N (§5.3, §5.4).
167 for (long N = context.getCommitIndex() + 1; ; N++) {
168 int replicatedCount = 1;
170 for (FollowerLogInformation info : followerToLog.values()) {
171 if (info.getMatchIndex().get() >= N) {
176 if (replicatedCount >= minReplicationCount) {
177 ReplicatedLogEntry replicatedLogEntry =
178 context.getReplicatedLog().get(N);
179 if (replicatedLogEntry != null
180 && replicatedLogEntry.getTerm()
182 context.setCommitIndex(N);
189 // Apply the change to the state machine
190 if (context.getCommitIndex() > context.getLastApplied()) {
191 applyLogToStateMachine(context.getCommitIndex());
197 protected ClientRequestTracker findClientRequestTracker(long logIndex) {
198 for (ClientRequestTracker tracker : trackerList) {
199 if (tracker.getIndex() == logIndex) {
207 @Override protected RaftState handleRequestVoteReply(ActorRef sender,
208 RequestVoteReply requestVoteReply) {
212 @Override public RaftState state() {
213 return RaftState.Leader;
216 @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
217 Preconditions.checkNotNull(sender, "sender should not be null");
219 Object message = fromSerializableMessage(originalMessage);
221 if (message instanceof RaftRPC) {
222 RaftRPC rpc = (RaftRPC) message;
223 // If RPC request or response contains term T > currentTerm:
224 // set currentTerm = T, convert to follower (§5.1)
225 // This applies to all RPC messages and responses
226 if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
227 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
228 return RaftState.Follower;
233 if (message instanceof SendHeartBeat) {
234 return sendHeartBeat();
235 } else if(message instanceof SendInstallSnapshot) {
236 installSnapshotIfNeeded();
237 } else if (message instanceof Replicate) {
238 replicate((Replicate) message);
239 } else if (message instanceof InstallSnapshotReply){
240 handleInstallSnapshotReply(
241 (InstallSnapshotReply) message);
244 scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
247 return super.handleMessage(sender, message);
250 private void handleInstallSnapshotReply(InstallSnapshotReply message) {
251 InstallSnapshotReply reply = message;
252 String followerId = reply.getFollowerId();
253 FollowerLogInformation followerLogInformation =
254 followerToLog.get(followerId);
256 followerLogInformation
257 .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
258 followerLogInformation
259 .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
262 private void replicate(Replicate replicate) {
263 long logIndex = replicate.getReplicatedLogEntry().getIndex();
265 context.getLogger().debug("Replicate message " + logIndex);
267 if (followers.size() == 0) {
268 context.setCommitIndex(
269 replicate.getReplicatedLogEntry().getIndex());
272 .tell(new ApplyState(replicate.getClientActor(),
273 replicate.getIdentifier(),
274 replicate.getReplicatedLogEntry()),
279 // Create a tracker entry we will use this later to notify the
282 new ClientRequestTrackerImpl(replicate.getClientActor(),
283 replicate.getIdentifier(),
291 private void sendAppendEntries() {
292 // Send an AppendEntries to all followers
293 for (String followerId : followers) {
294 ActorSelection followerActor =
295 context.getPeerActorSelection(followerId);
297 if (followerActor != null) {
298 FollowerLogInformation followerLogInformation =
299 followerToLog.get(followerId);
301 long nextIndex = followerLogInformation.getNextIndex().get();
303 List<ReplicatedLogEntry> entries = Collections.emptyList();
305 if (context.getReplicatedLog().isPresent(nextIndex)) {
306 // TODO: Instead of sending all entries from nextIndex
307 // only send a fixed number of entries to each follower
308 // This is to avoid the situation where there are a lot of
309 // entries to install for a fresh follower or to a follower
310 // that has fallen too far behind with the log but yet is not
311 // eligible to receive a snapshot
313 context.getReplicatedLog().getFrom(nextIndex, 1);
317 new AppendEntries(currentTerm(), context.getId(),
318 prevLogIndex(nextIndex),
319 prevLogTerm(nextIndex), entries,
320 context.getCommitIndex()).toSerializable(),
328 * An installSnapshot is scheduled at a interval that is a multiple of
329 * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
330 * snapshots at every heartbeat.
332 private void installSnapshotIfNeeded(){
333 for (String followerId : followers) {
334 ActorSelection followerActor =
335 context.getPeerActorSelection(followerId);
337 if(followerActor != null) {
338 FollowerLogInformation followerLogInformation =
339 followerToLog.get(followerId);
341 long nextIndex = followerLogInformation.getNextIndex().get();
343 if (!context.getReplicatedLog().isPresent(nextIndex) && context
344 .getReplicatedLog().isInSnapshot(nextIndex)) {
346 new InstallSnapshot(currentTerm(), context.getId(),
347 context.getReplicatedLog().getSnapshotIndex(),
348 context.getReplicatedLog().getSnapshotTerm(),
349 context.getReplicatedLog().getSnapshot()
358 private RaftState sendHeartBeat() {
359 if (followers.size() > 0) {
365 private void stopHeartBeat() {
366 if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
367 heartbeatSchedule.cancel();
371 private void stopInstallSnapshotSchedule() {
372 if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
373 installSnapshotSchedule.cancel();
377 private void scheduleHeartBeat(FiniteDuration interval) {
378 if(followers.size() == 0){
379 // Optimization - do not bother scheduling a heartbeat as there are
386 // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
387 // message is sent to itself.
388 // Scheduling the heartbeat only once here because heartbeats do not
389 // need to be sent if there are other messages being sent to the remote
392 context.getActorSystem().scheduler().scheduleOnce(
394 context.getActor(), new SendHeartBeat(),
395 context.getActorSystem().dispatcher(), context.getActor());
399 private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
400 if(followers.size() == 0){
401 // Optimization - do not bother scheduling a heartbeat as there are
406 stopInstallSnapshotSchedule();
408 // Schedule a message to send append entries to followers that can
409 // accept an append entries with some data in it
410 installSnapshotSchedule =
411 context.getActorSystem().scheduler().scheduleOnce(
413 context.getActor(), new SendInstallSnapshot(),
414 context.getActorSystem().dispatcher(), context.getActor());
419 @Override public void close() throws Exception {
423 @Override public String getLeaderId() {
424 return context.getId();