2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
16 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
17 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
19 import org.opendaylight.controller.cluster.raft.RaftActorContext;
20 import org.opendaylight.controller.cluster.raft.RaftState;
21 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
22 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
23 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
24 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
25 import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
29 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
30 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
31 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
32 import scala.concurrent.duration.FiniteDuration;
34 import java.util.ArrayList;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.List;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicLong;
43 * The behavior of a RaftActor when it is in the Leader state
47 * <li> Upon election: send initial empty AppendEntries RPCs
48 * (heartbeat) to each server; repeat during idle periods to
49 * prevent election timeouts (§5.2)
50 * <li> If command received from client: append entry to local log,
51 * respond after entry applied to state machine (§5.3)
52 * <li> If last log index ≥ nextIndex for a follower: send
53 * AppendEntries RPC with log entries starting at nextIndex
55 * <li> If successful: update nextIndex and matchIndex for
57 * <li> If AppendEntries fails because of log inconsistency:
58 * decrement nextIndex and retry (§5.3)
60 * <li> If there exists an N such that N > commitIndex, a majority
61 * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
62 * set commitIndex = N (§5.3, §5.4).
64 public class Leader extends AbstractRaftActorBehavior {
67 private final Map<String, FollowerLogInformation> followerToLog =
70 private final Map<String, ActorSelection> followerToActor = new HashMap<>();
72 private Cancellable heartbeatSchedule = null;
73 private Cancellable appendEntriesSchedule = null;
74 private Cancellable installSnapshotSchedule = null;
76 private List<ClientRequestTracker> trackerList = new ArrayList<>();
78 private final int minReplicationCount;
80 public Leader(RaftActorContext context) {
83 if (lastIndex() >= 0) {
84 context.setCommitIndex(lastIndex());
87 for (String followerId : context.getPeerAddresses().keySet()) {
88 FollowerLogInformation followerLogInformation =
89 new FollowerLogInformationImpl(followerId,
90 new AtomicLong(lastIndex()),
93 followerToActor.put(followerId,
94 context.actorSelection(context.getPeerAddress(followerId)));
96 followerToLog.put(followerId, followerLogInformation);
99 context.getLogger().debug("Election:Leader has following peers:"+followerToActor.keySet());
101 if (followerToActor.size() > 0) {
102 minReplicationCount = (followerToActor.size() + 1) / 2 + 1;
104 minReplicationCount = 0;
108 // Immediately schedule a heartbeat
109 // Upon election: send initial empty AppendEntries RPCs
110 // (heartbeat) to each server; repeat during idle periods to
111 // prevent election timeouts (§5.2)
112 scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
114 scheduleInstallSnapshotCheck(
115 new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000,
116 HEART_BEAT_INTERVAL.unit())
121 @Override protected RaftState handleAppendEntries(ActorRef sender,
122 AppendEntries appendEntries) {
127 @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
128 AppendEntriesReply appendEntriesReply) {
130 // Update the FollowerLogInformation
131 String followerId = appendEntriesReply.getFollowerId();
132 FollowerLogInformation followerLogInformation =
133 followerToLog.get(followerId);
134 if (appendEntriesReply.isSuccess()) {
135 followerLogInformation
136 .setMatchIndex(appendEntriesReply.getLogLastIndex());
137 followerLogInformation
138 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
141 // TODO: When we find that the follower is out of sync with the
142 // Leader we simply decrement that followers next index by 1.
143 // Would it be possible to do better than this? The RAFT spec
144 // does not explicitly deal with it but may be something for us to
147 followerLogInformation.decrNextIndex();
150 // Now figure out if this reply warrants a change in the commitIndex
151 // If there exists an N such that N > commitIndex, a majority
152 // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
153 // set commitIndex = N (§5.3, §5.4).
154 for (long N = context.getCommitIndex() + 1; ; N++) {
155 int replicatedCount = 1;
157 for (FollowerLogInformation info : followerToLog.values()) {
158 if (info.getMatchIndex().get() >= N) {
163 if (replicatedCount >= minReplicationCount) {
164 ReplicatedLogEntry replicatedLogEntry =
165 context.getReplicatedLog().get(N);
166 if (replicatedLogEntry != null
167 && replicatedLogEntry.getTerm()
169 context.setCommitIndex(N);
176 // Apply the change to the state machine
177 if (context.getCommitIndex() > context.getLastApplied()) {
178 applyLogToStateMachine(context.getCommitIndex());
184 protected ClientRequestTracker findClientRequestTracker(long logIndex) {
185 for (ClientRequestTracker tracker : trackerList) {
186 if (tracker.getIndex() == logIndex) {
194 @Override protected RaftState handleRequestVoteReply(ActorRef sender,
195 RequestVoteReply requestVoteReply) {
199 @Override public RaftState state() {
200 return RaftState.Leader;
203 @Override public RaftState handleMessage(ActorRef sender, Object message) {
204 Preconditions.checkNotNull(sender, "sender should not be null");
206 if (message instanceof RaftRPC) {
207 RaftRPC rpc = (RaftRPC) message;
208 // If RPC request or response contains term T > currentTerm:
209 // set currentTerm = T, convert to follower (§5.1)
210 // This applies to all RPC messages and responses
211 if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
212 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
213 return RaftState.Follower;
218 if (message instanceof SendHeartBeat) {
219 return sendHeartBeat();
220 } else if(message instanceof SendInstallSnapshot) {
221 installSnapshotIfNeeded();
222 } else if (message instanceof Replicate) {
223 replicate((Replicate) message);
224 } else if (message instanceof InstallSnapshotReply){
225 handleInstallSnapshotReply(
226 (InstallSnapshotReply) message);
229 scheduleHeartBeat(HEART_BEAT_INTERVAL);
232 return super.handleMessage(sender, message);
235 private void handleInstallSnapshotReply(InstallSnapshotReply message) {
236 InstallSnapshotReply reply = message;
237 String followerId = reply.getFollowerId();
238 FollowerLogInformation followerLogInformation =
239 followerToLog.get(followerId);
241 followerLogInformation
242 .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
243 followerLogInformation
244 .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
247 private void replicate(Replicate replicate) {
248 long logIndex = replicate.getReplicatedLogEntry().getIndex();
250 context.getLogger().debug("Replicate message " + logIndex);
252 if (followerToActor.size() == 0) {
253 context.setCommitIndex(
254 replicate.getReplicatedLogEntry().getIndex());
257 .tell(new ApplyState(replicate.getClientActor(),
258 replicate.getIdentifier(),
259 replicate.getReplicatedLogEntry()),
264 // Create a tracker entry we will use this later to notify the
267 new ClientRequestTrackerImpl(replicate.getClientActor(),
268 replicate.getIdentifier(),
276 private void sendAppendEntries() {
277 // Send an AppendEntries to all followers
278 for (String followerId : followerToActor.keySet()) {
279 ActorSelection followerActor =
280 followerToActor.get(followerId);
282 FollowerLogInformation followerLogInformation =
283 followerToLog.get(followerId);
285 long nextIndex = followerLogInformation.getNextIndex().get();
287 List<ReplicatedLogEntry> entries = Collections.emptyList();
289 if(context.getReplicatedLog().isPresent(nextIndex)){
290 // TODO: Instead of sending all entries from nextIndex
291 // only send a fixed number of entries to each follower
292 // This is to avoid the situation where there are a lot of
293 // entries to install for a fresh follower or to a follower
294 // that has fallen too far behind with the log but yet is not
295 // eligible to receive a snapshot
297 context.getReplicatedLog().getFrom(nextIndex);
301 new AppendEntries(currentTerm(), context.getId(),
302 prevLogIndex(nextIndex), prevLogTerm(nextIndex),
303 entries, context.getCommitIndex()
311 * An installSnapshot is scheduled at a interval that is a multiple of
312 * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
313 * snapshots at every heartbeat.
315 private void installSnapshotIfNeeded(){
316 for (String followerId : followerToActor.keySet()) {
317 ActorSelection followerActor =
318 followerToActor.get(followerId);
320 FollowerLogInformation followerLogInformation =
321 followerToLog.get(followerId);
323 long nextIndex = followerLogInformation.getNextIndex().get();
325 if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){
327 new InstallSnapshot(currentTerm(), context.getId(),
328 context.getReplicatedLog().getSnapshotIndex(),
329 context.getReplicatedLog().getSnapshotTerm(),
330 context.getReplicatedLog().getSnapshot()
338 private RaftState sendHeartBeat() {
339 if (followerToActor.size() > 0) {
345 private void stopHeartBeat() {
346 if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
347 heartbeatSchedule.cancel();
351 private void stopInstallSnapshotSchedule() {
352 if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
353 installSnapshotSchedule.cancel();
357 private void scheduleHeartBeat(FiniteDuration interval) {
358 if(followerToActor.keySet().size() == 0){
359 // Optimization - do not bother scheduling a heartbeat as there are
366 // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
367 // message is sent to itself.
368 // Scheduling the heartbeat only once here because heartbeats do not
369 // need to be sent if there are other messages being sent to the remote
372 context.getActorSystem().scheduler().scheduleOnce(
374 context.getActor(), new SendHeartBeat(),
375 context.getActorSystem().dispatcher(), context.getActor());
379 private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
380 if(followerToActor.keySet().size() == 0){
381 // Optimization - do not bother scheduling a heartbeat as there are
386 stopInstallSnapshotSchedule();
388 // Schedule a message to send append entries to followers that can
389 // accept an append entries with some data in it
390 installSnapshotSchedule =
391 context.getActorSystem().scheduler().scheduleOnce(
393 context.getActor(), new SendInstallSnapshot(),
394 context.getActorSystem().dispatcher(), context.getActor());
399 @Override public void close() throws Exception {
403 @Override public String getLeaderId() {
404 return context.getId();