2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
16 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
17 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
19 import org.opendaylight.controller.cluster.raft.RaftActorContext;
20 import org.opendaylight.controller.cluster.raft.RaftState;
21 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
22 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
23 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
24 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
25 import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
29 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
30 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
31 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
32 import scala.concurrent.duration.FiniteDuration;
34 import java.util.ArrayList;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.List;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicLong;
43 * The behavior of a RaftActor when it is in the Leader state
47 * <li> Upon election: send initial empty AppendEntries RPCs
48 * (heartbeat) to each server; repeat during idle periods to
49 * prevent election timeouts (§5.2)
50 * <li> If command received from client: append entry to local log,
51 * respond after entry applied to state machine (§5.3)
52 * <li> If last log index ≥ nextIndex for a follower: send
53 * AppendEntries RPC with log entries starting at nextIndex
55 * <li> If successful: update nextIndex and matchIndex for
57 * <li> If AppendEntries fails because of log inconsistency:
58 * decrement nextIndex and retry (§5.3)
60 * <li> If there exists an N such that N > commitIndex, a majority
61 * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
62 * set commitIndex = N (§5.3, §5.4).
64 public class Leader extends AbstractRaftActorBehavior {
67 private final Map<String, FollowerLogInformation> followerToLog =
70 private final Map<String, ActorSelection> followerToActor = new HashMap<>();
72 private Cancellable heartbeatSchedule = null;
73 private Cancellable appendEntriesSchedule = null;
74 private Cancellable installSnapshotSchedule = null;
76 private List<ClientRequestTracker> trackerList = new ArrayList<>();
78 private final int minReplicationCount;
80 public Leader(RaftActorContext context) {
83 if (lastIndex() >= 0) {
84 context.setCommitIndex(lastIndex());
87 for (String followerId : context.getPeerAddresses().keySet()) {
88 FollowerLogInformation followerLogInformation =
89 new FollowerLogInformationImpl(followerId,
90 new AtomicLong(lastIndex()),
93 followerToActor.put(followerId,
94 context.actorSelection(context.getPeerAddress(followerId)));
96 followerToLog.put(followerId, followerLogInformation);
99 context.getLogger().debug("Election:Leader has following peers:"+followerToActor.keySet());
101 if (followerToActor.size() > 0) {
102 minReplicationCount = (followerToActor.size() + 1) / 2 + 1;
104 minReplicationCount = 0;
108 // Immediately schedule a heartbeat
109 // Upon election: send initial empty AppendEntries RPCs
110 // (heartbeat) to each server; repeat during idle periods to
111 // prevent election timeouts (§5.2)
112 scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
114 scheduleInstallSnapshotCheck(
115 new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000,
116 HEART_BEAT_INTERVAL.unit())
121 @Override protected RaftState handleAppendEntries(ActorRef sender,
122 AppendEntries appendEntries) {
127 @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
128 AppendEntriesReply appendEntriesReply) {
130 // Update the FollowerLogInformation
131 String followerId = appendEntriesReply.getFollowerId();
132 FollowerLogInformation followerLogInformation =
133 followerToLog.get(followerId);
134 if (appendEntriesReply.isSuccess()) {
135 followerLogInformation
136 .setMatchIndex(appendEntriesReply.getLogLastIndex());
137 followerLogInformation
138 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
141 // TODO: When we find that the follower is out of sync with the
142 // Leader we simply decrement that followers next index by 1.
143 // Would it be possible to do better than this? The RAFT spec
144 // does not explicitly deal with it but may be something for us to
147 followerLogInformation.decrNextIndex();
150 // Now figure out if this reply warrants a change in the commitIndex
151 // If there exists an N such that N > commitIndex, a majority
152 // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
153 // set commitIndex = N (§5.3, §5.4).
154 for (long N = context.getCommitIndex() + 1; ; N++) {
155 int replicatedCount = 1;
157 for (FollowerLogInformation info : followerToLog.values()) {
158 if (info.getMatchIndex().get() >= N) {
163 if (replicatedCount >= minReplicationCount) {
164 ReplicatedLogEntry replicatedLogEntry =
165 context.getReplicatedLog().get(N);
166 if (replicatedLogEntry != null
167 && replicatedLogEntry.getTerm()
169 context.setCommitIndex(N);
176 // Apply the change to the state machine
177 if (context.getCommitIndex() > context.getLastApplied()) {
178 applyLogToStateMachine(context.getCommitIndex());
184 protected ClientRequestTracker findClientRequestTracker(long logIndex) {
185 for (ClientRequestTracker tracker : trackerList) {
186 if (tracker.getIndex() == logIndex) {
194 @Override protected RaftState handleRequestVoteReply(ActorRef sender,
195 RequestVoteReply requestVoteReply) {
199 @Override public RaftState state() {
200 return RaftState.Leader;
203 @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
204 Preconditions.checkNotNull(sender, "sender should not be null");
206 Object message = fromSerializableMessage(originalMessage);
208 if (message instanceof RaftRPC) {
209 RaftRPC rpc = (RaftRPC) message;
210 // If RPC request or response contains term T > currentTerm:
211 // set currentTerm = T, convert to follower (§5.1)
212 // This applies to all RPC messages and responses
213 if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
214 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
215 return RaftState.Follower;
220 if (message instanceof SendHeartBeat) {
221 return sendHeartBeat();
222 } else if(message instanceof SendInstallSnapshot) {
223 installSnapshotIfNeeded();
224 } else if (message instanceof Replicate) {
225 replicate((Replicate) message);
226 } else if (message instanceof InstallSnapshotReply){
227 handleInstallSnapshotReply(
228 (InstallSnapshotReply) message);
231 scheduleHeartBeat(HEART_BEAT_INTERVAL);
234 return super.handleMessage(sender, message);
237 private void handleInstallSnapshotReply(InstallSnapshotReply message) {
238 InstallSnapshotReply reply = message;
239 String followerId = reply.getFollowerId();
240 FollowerLogInformation followerLogInformation =
241 followerToLog.get(followerId);
243 followerLogInformation
244 .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
245 followerLogInformation
246 .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
249 private void replicate(Replicate replicate) {
250 long logIndex = replicate.getReplicatedLogEntry().getIndex();
252 context.getLogger().debug("Replicate message " + logIndex);
254 if (followerToActor.size() == 0) {
255 context.setCommitIndex(
256 replicate.getReplicatedLogEntry().getIndex());
259 .tell(new ApplyState(replicate.getClientActor(),
260 replicate.getIdentifier(),
261 replicate.getReplicatedLogEntry()),
266 // Create a tracker entry we will use this later to notify the
269 new ClientRequestTrackerImpl(replicate.getClientActor(),
270 replicate.getIdentifier(),
278 private void sendAppendEntries() {
279 // Send an AppendEntries to all followers
280 for (String followerId : followerToActor.keySet()) {
281 ActorSelection followerActor =
282 followerToActor.get(followerId);
284 FollowerLogInformation followerLogInformation =
285 followerToLog.get(followerId);
287 long nextIndex = followerLogInformation.getNextIndex().get();
289 List<ReplicatedLogEntry> entries = Collections.emptyList();
291 if(context.getReplicatedLog().isPresent(nextIndex)){
292 // TODO: Instead of sending all entries from nextIndex
293 // only send a fixed number of entries to each follower
294 // This is to avoid the situation where there are a lot of
295 // entries to install for a fresh follower or to a follower
296 // that has fallen too far behind with the log but yet is not
297 // eligible to receive a snapshot
299 context.getReplicatedLog().getFrom(nextIndex);
303 new AppendEntries(currentTerm(), context.getId(), prevLogIndex(nextIndex),
304 prevLogTerm(nextIndex), entries, context.getCommitIndex()).toSerializable(),
310 * An installSnapshot is scheduled at a interval that is a multiple of
311 * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
312 * snapshots at every heartbeat.
314 private void installSnapshotIfNeeded(){
315 for (String followerId : followerToActor.keySet()) {
316 ActorSelection followerActor =
317 followerToActor.get(followerId);
319 FollowerLogInformation followerLogInformation =
320 followerToLog.get(followerId);
322 long nextIndex = followerLogInformation.getNextIndex().get();
324 if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){
326 new InstallSnapshot(currentTerm(), context.getId(),
327 context.getReplicatedLog().getSnapshotIndex(),
328 context.getReplicatedLog().getSnapshotTerm(),
329 context.getReplicatedLog().getSnapshot()
337 private RaftState sendHeartBeat() {
338 if (followerToActor.size() > 0) {
344 private void stopHeartBeat() {
345 if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
346 heartbeatSchedule.cancel();
350 private void stopInstallSnapshotSchedule() {
351 if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
352 installSnapshotSchedule.cancel();
356 private void scheduleHeartBeat(FiniteDuration interval) {
357 if(followerToActor.keySet().size() == 0){
358 // Optimization - do not bother scheduling a heartbeat as there are
365 // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
366 // message is sent to itself.
367 // Scheduling the heartbeat only once here because heartbeats do not
368 // need to be sent if there are other messages being sent to the remote
371 context.getActorSystem().scheduler().scheduleOnce(
373 context.getActor(), new SendHeartBeat(),
374 context.getActorSystem().dispatcher(), context.getActor());
378 private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
379 if(followerToActor.keySet().size() == 0){
380 // Optimization - do not bother scheduling a heartbeat as there are
385 stopInstallSnapshotSchedule();
387 // Schedule a message to send append entries to followers that can
388 // accept an append entries with some data in it
389 installSnapshotSchedule =
390 context.getActorSystem().scheduler().scheduleOnce(
392 context.getActor(), new SendInstallSnapshot(),
393 context.getActorSystem().dispatcher(), context.getActor());
398 @Override public void close() throws Exception {
402 @Override public String getLeaderId() {
403 return context.getId();