2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
16 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
17 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
19 import org.opendaylight.controller.cluster.raft.RaftActorContext;
20 import org.opendaylight.controller.cluster.raft.RaftState;
21 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
22 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
23 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
24 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
25 import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
29 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
30 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
31 import scala.concurrent.duration.FiniteDuration;
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.HashMap;
36 import java.util.List;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicLong;
42 * The behavior of a RaftActor when it is in the Leader state
46 * <li> Upon election: send initial empty AppendEntries RPCs
47 * (heartbeat) to each server; repeat during idle periods to
48 * prevent election timeouts (§5.2)
49 * <li> If command received from client: append entry to local log,
50 * respond after entry applied to state machine (§5.3)
51 * <li> If last log index ≥ nextIndex for a follower: send
52 * AppendEntries RPC with log entries starting at nextIndex
54 * <li> If successful: update nextIndex and matchIndex for
56 * <li> If AppendEntries fails because of log inconsistency:
57 * decrement nextIndex and retry (§5.3)
59 * <li> If there exists an N such that N > commitIndex, a majority
60 * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
61 * set commitIndex = N (§5.3, §5.4).
63 public class Leader extends AbstractRaftActorBehavior {
66 private final Map<String, FollowerLogInformation> followerToLog =
69 private final Map<String, ActorSelection> followerToActor = new HashMap<>();
71 private Cancellable heartbeatSchedule = null;
72 private Cancellable appendEntriesSchedule = null;
73 private Cancellable installSnapshotSchedule = null;
75 private List<ClientRequestTracker> trackerList = new ArrayList<>();
77 private final int minReplicationCount;
79 public Leader(RaftActorContext context) {
82 if (lastIndex() >= 0) {
83 context.setCommitIndex(lastIndex());
86 for (String followerId : context.getPeerAddresses().keySet()) {
87 FollowerLogInformation followerLogInformation =
88 new FollowerLogInformationImpl(followerId,
89 new AtomicLong(lastIndex()),
92 followerToActor.put(followerId,
93 context.actorSelection(context.getPeerAddress(followerId)));
95 followerToLog.put(followerId, followerLogInformation);
99 if (followerToActor.size() > 0) {
100 minReplicationCount = (followerToActor.size() + 1) / 2 + 1;
102 minReplicationCount = 0;
106 // Immediately schedule a heartbeat
107 // Upon election: send initial empty AppendEntries RPCs
108 // (heartbeat) to each server; repeat during idle periods to
109 // prevent election timeouts (§5.2)
110 scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
112 scheduleInstallSnapshotCheck(
113 new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000,
114 HEART_BEAT_INTERVAL.unit())
119 @Override protected RaftState handleAppendEntries(ActorRef sender,
120 AppendEntries appendEntries, RaftState suggestedState) {
123 .error("An unexpected AppendEntries received in state " + state());
125 return suggestedState;
128 @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
129 AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
131 // Do not take any other action since a behavior change is coming
132 if (suggestedState != state())
133 return suggestedState;
135 // Update the FollowerLogInformation
136 String followerId = appendEntriesReply.getFollowerId();
137 FollowerLogInformation followerLogInformation =
138 followerToLog.get(followerId);
139 if (appendEntriesReply.isSuccess()) {
140 followerLogInformation
141 .setMatchIndex(appendEntriesReply.getLogLastIndex());
142 followerLogInformation
143 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
145 followerLogInformation.decrNextIndex();
148 // Now figure out if this reply warrants a change in the commitIndex
149 // If there exists an N such that N > commitIndex, a majority
150 // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
151 // set commitIndex = N (§5.3, §5.4).
152 for (long N = context.getCommitIndex() + 1; ; N++) {
153 int replicatedCount = 1;
155 for (FollowerLogInformation info : followerToLog.values()) {
156 if (info.getMatchIndex().get() >= N) {
161 if (replicatedCount >= minReplicationCount) {
162 ReplicatedLogEntry replicatedLogEntry =
163 context.getReplicatedLog().get(N);
164 if (replicatedLogEntry != null
165 && replicatedLogEntry.getTerm()
167 context.setCommitIndex(N);
174 // Apply the change to the state machine
175 if (context.getCommitIndex() > context.getLastApplied()) {
176 applyLogToStateMachine(context.getCommitIndex());
179 return suggestedState;
182 protected ClientRequestTracker findClientRequestTracker(long logIndex) {
183 for (ClientRequestTracker tracker : trackerList) {
184 if (tracker.getIndex() == logIndex) {
192 @Override protected RaftState handleRequestVoteReply(ActorRef sender,
193 RequestVoteReply requestVoteReply, RaftState suggestedState) {
194 return suggestedState;
197 @Override public RaftState state() {
198 return RaftState.Leader;
201 @Override public RaftState handleMessage(ActorRef sender, Object message) {
202 Preconditions.checkNotNull(sender, "sender should not be null");
205 if (message instanceof SendHeartBeat) {
206 return sendHeartBeat();
207 } else if(message instanceof SendInstallSnapshot) {
208 installSnapshotIfNeeded();
209 } else if (message instanceof Replicate) {
210 replicate((Replicate) message);
211 } else if (message instanceof InstallSnapshotReply){
212 // FIXME : Should I be checking the term here too?
213 handleInstallSnapshotReply(
214 (InstallSnapshotReply) message);
217 scheduleHeartBeat(HEART_BEAT_INTERVAL);
220 return super.handleMessage(sender, message);
223 private void handleInstallSnapshotReply(InstallSnapshotReply message) {
224 InstallSnapshotReply reply = message;
225 String followerId = reply.getFollowerId();
226 FollowerLogInformation followerLogInformation =
227 followerToLog.get(followerId);
229 followerLogInformation
230 .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
231 followerLogInformation
232 .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
235 private void replicate(Replicate replicate) {
236 long logIndex = replicate.getReplicatedLogEntry().getIndex();
238 context.getLogger().debug("Replicate message " + logIndex);
240 if (followerToActor.size() == 0) {
241 context.setCommitIndex(
242 replicate.getReplicatedLogEntry().getIndex());
245 .tell(new ApplyState(replicate.getClientActor(),
246 replicate.getIdentifier(),
247 replicate.getReplicatedLogEntry()),
252 // Create a tracker entry we will use this later to notify the
255 new ClientRequestTrackerImpl(replicate.getClientActor(),
256 replicate.getIdentifier(),
264 private void sendAppendEntries() {
265 // Send an AppendEntries to all followers
266 for (String followerId : followerToActor.keySet()) {
267 ActorSelection followerActor =
268 followerToActor.get(followerId);
270 FollowerLogInformation followerLogInformation =
271 followerToLog.get(followerId);
273 long nextIndex = followerLogInformation.getNextIndex().get();
275 List<ReplicatedLogEntry> entries = Collections.emptyList();
277 if(context.getReplicatedLog().isPresent(nextIndex)){
279 context.getReplicatedLog().getFrom(nextIndex);
283 new AppendEntries(currentTerm(), context.getId(),
284 prevLogIndex(nextIndex), prevLogTerm(nextIndex),
285 entries, context.getCommitIndex()
292 private void installSnapshotIfNeeded(){
293 for (String followerId : followerToActor.keySet()) {
294 ActorSelection followerActor =
295 followerToActor.get(followerId);
297 FollowerLogInformation followerLogInformation =
298 followerToLog.get(followerId);
300 long nextIndex = followerLogInformation.getNextIndex().get();
302 if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){
304 new InstallSnapshot(currentTerm(), context.getId(),
305 context.getReplicatedLog().getSnapshotIndex(),
306 context.getReplicatedLog().getSnapshotTerm(),
307 context.getReplicatedLog().getSnapshot()
315 private RaftState sendHeartBeat() {
316 if (followerToActor.size() > 0) {
322 private void stopHeartBeat() {
323 if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
324 heartbeatSchedule.cancel();
328 private void stopInstallSnapshotSchedule() {
329 if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
330 installSnapshotSchedule.cancel();
334 private void scheduleHeartBeat(FiniteDuration interval) {
335 if(followerToActor.keySet().size() == 0){
336 // Optimization - do not bother scheduling a heartbeat as there are
343 // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
344 // message is sent to itself.
345 // Scheduling the heartbeat only once here because heartbeats do not
346 // need to be sent if there are other messages being sent to the remote
349 context.getActorSystem().scheduler().scheduleOnce(
351 context.getActor(), new SendHeartBeat(),
352 context.getActorSystem().dispatcher(), context.getActor());
356 private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
357 if(followerToActor.keySet().size() == 0){
358 // Optimization - do not bother scheduling a heartbeat as there are
363 stopInstallSnapshotSchedule();
365 // Schedule a message to send append entries to followers that can
366 // accept an append entries with some data in it
367 installSnapshotSchedule =
368 context.getActorSystem().scheduler().scheduleOnce(
370 context.getActor(), new SendInstallSnapshot(),
371 context.getActorSystem().dispatcher(), context.getActor());
376 @Override public void close() throws Exception {
380 @Override public String getLeaderId() {
381 return context.getId();