2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
16 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
17 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
19 import org.opendaylight.controller.cluster.raft.RaftActorContext;
20 import org.opendaylight.controller.cluster.raft.RaftState;
21 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
22 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
23 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
24 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
25 import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
29 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
30 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
31 import scala.concurrent.duration.FiniteDuration;
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.HashMap;
36 import java.util.List;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicLong;
42 * The behavior of a RaftActor when it is in the Leader state
46 * <li> Upon election: send initial empty AppendEntries RPCs
47 * (heartbeat) to each server; repeat during idle periods to
48 * prevent election timeouts (§5.2)
49 * <li> If command received from client: append entry to local log,
50 * respond after entry applied to state machine (§5.3)
51 * <li> If last log index ≥ nextIndex for a follower: send
52 * AppendEntries RPC with log entries starting at nextIndex
54 * <li> If successful: update nextIndex and matchIndex for
56 * <li> If AppendEntries fails because of log inconsistency:
57 * decrement nextIndex and retry (§5.3)
59 * <li> If there exists an N such that N > commitIndex, a majority
60 * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
61 * set commitIndex = N (§5.3, §5.4).
63 public class Leader extends AbstractRaftActorBehavior {
66 private final Map<String, FollowerLogInformation> followerToLog =
69 private final Map<String, ActorSelection> followerToActor = new HashMap<>();
71 private Cancellable heartbeatSchedule = null;
72 private Cancellable appendEntriesSchedule = null;
73 private Cancellable installSnapshotSchedule = null;
75 private List<ClientRequestTracker> trackerList = new ArrayList<>();
77 private final int minReplicationCount;
79 public Leader(RaftActorContext context) {
82 if (lastIndex() >= 0) {
83 context.setCommitIndex(lastIndex());
86 for (String followerId : context.getPeerAddresses().keySet()) {
87 FollowerLogInformation followerLogInformation =
88 new FollowerLogInformationImpl(followerId,
89 new AtomicLong(lastIndex()),
92 followerToActor.put(followerId,
93 context.actorSelection(context.getPeerAddress(followerId)));
95 followerToLog.put(followerId, followerLogInformation);
98 context.getLogger().debug("Election:Leader has following peers:"+followerToActor.keySet());
100 if (followerToActor.size() > 0) {
101 minReplicationCount = (followerToActor.size() + 1) / 2 + 1;
103 minReplicationCount = 0;
107 // Immediately schedule a heartbeat
108 // Upon election: send initial empty AppendEntries RPCs
109 // (heartbeat) to each server; repeat during idle periods to
110 // prevent election timeouts (§5.2)
111 scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
113 scheduleInstallSnapshotCheck(
114 new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000,
115 HEART_BEAT_INTERVAL.unit())
120 @Override protected RaftState handleAppendEntries(ActorRef sender,
121 AppendEntries appendEntries, RaftState suggestedState) {
124 .error("An unexpected AppendEntries received in state " + state());
126 return suggestedState;
129 @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
130 AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
132 // Do not take any other action since a behavior change is coming
133 if (suggestedState != state())
134 return suggestedState;
136 // Update the FollowerLogInformation
137 String followerId = appendEntriesReply.getFollowerId();
138 FollowerLogInformation followerLogInformation =
139 followerToLog.get(followerId);
140 if (appendEntriesReply.isSuccess()) {
141 followerLogInformation
142 .setMatchIndex(appendEntriesReply.getLogLastIndex());
143 followerLogInformation
144 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
146 followerLogInformation.decrNextIndex();
149 // Now figure out if this reply warrants a change in the commitIndex
150 // If there exists an N such that N > commitIndex, a majority
151 // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
152 // set commitIndex = N (§5.3, §5.4).
153 for (long N = context.getCommitIndex() + 1; ; N++) {
154 int replicatedCount = 1;
156 for (FollowerLogInformation info : followerToLog.values()) {
157 if (info.getMatchIndex().get() >= N) {
162 if (replicatedCount >= minReplicationCount) {
163 ReplicatedLogEntry replicatedLogEntry =
164 context.getReplicatedLog().get(N);
165 if (replicatedLogEntry != null
166 && replicatedLogEntry.getTerm()
168 context.setCommitIndex(N);
175 // Apply the change to the state machine
176 if (context.getCommitIndex() > context.getLastApplied()) {
177 applyLogToStateMachine(context.getCommitIndex());
180 return suggestedState;
183 protected ClientRequestTracker findClientRequestTracker(long logIndex) {
184 for (ClientRequestTracker tracker : trackerList) {
185 if (tracker.getIndex() == logIndex) {
193 @Override protected RaftState handleRequestVoteReply(ActorRef sender,
194 RequestVoteReply requestVoteReply, RaftState suggestedState) {
195 return suggestedState;
198 @Override public RaftState state() {
199 return RaftState.Leader;
202 @Override public RaftState handleMessage(ActorRef sender, Object message) {
203 Preconditions.checkNotNull(sender, "sender should not be null");
206 if (message instanceof SendHeartBeat) {
207 return sendHeartBeat();
208 } else if(message instanceof SendInstallSnapshot) {
209 installSnapshotIfNeeded();
210 } else if (message instanceof Replicate) {
211 replicate((Replicate) message);
212 } else if (message instanceof InstallSnapshotReply){
213 // FIXME : Should I be checking the term here too?
214 handleInstallSnapshotReply(
215 (InstallSnapshotReply) message);
218 scheduleHeartBeat(HEART_BEAT_INTERVAL);
221 return super.handleMessage(sender, message);
224 private void handleInstallSnapshotReply(InstallSnapshotReply message) {
225 InstallSnapshotReply reply = message;
226 String followerId = reply.getFollowerId();
227 FollowerLogInformation followerLogInformation =
228 followerToLog.get(followerId);
230 followerLogInformation
231 .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
232 followerLogInformation
233 .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
236 private void replicate(Replicate replicate) {
237 long logIndex = replicate.getReplicatedLogEntry().getIndex();
239 context.getLogger().debug("Replicate message " + logIndex);
241 if (followerToActor.size() == 0) {
242 context.setCommitIndex(
243 replicate.getReplicatedLogEntry().getIndex());
246 .tell(new ApplyState(replicate.getClientActor(),
247 replicate.getIdentifier(),
248 replicate.getReplicatedLogEntry()),
253 // Create a tracker entry we will use this later to notify the
256 new ClientRequestTrackerImpl(replicate.getClientActor(),
257 replicate.getIdentifier(),
265 private void sendAppendEntries() {
266 // Send an AppendEntries to all followers
267 for (String followerId : followerToActor.keySet()) {
268 ActorSelection followerActor =
269 followerToActor.get(followerId);
271 FollowerLogInformation followerLogInformation =
272 followerToLog.get(followerId);
274 long nextIndex = followerLogInformation.getNextIndex().get();
276 List<ReplicatedLogEntry> entries = Collections.emptyList();
278 if(context.getReplicatedLog().isPresent(nextIndex)){
280 context.getReplicatedLog().getFrom(nextIndex);
284 new AppendEntries(currentTerm(), context.getId(),
285 prevLogIndex(nextIndex), prevLogTerm(nextIndex),
286 entries, context.getCommitIndex()
293 private void installSnapshotIfNeeded(){
294 for (String followerId : followerToActor.keySet()) {
295 ActorSelection followerActor =
296 followerToActor.get(followerId);
298 FollowerLogInformation followerLogInformation =
299 followerToLog.get(followerId);
301 long nextIndex = followerLogInformation.getNextIndex().get();
303 if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){
305 new InstallSnapshot(currentTerm(), context.getId(),
306 context.getReplicatedLog().getSnapshotIndex(),
307 context.getReplicatedLog().getSnapshotTerm(),
308 context.getReplicatedLog().getSnapshot()
316 private RaftState sendHeartBeat() {
317 if (followerToActor.size() > 0) {
323 private void stopHeartBeat() {
324 if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
325 heartbeatSchedule.cancel();
329 private void stopInstallSnapshotSchedule() {
330 if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
331 installSnapshotSchedule.cancel();
335 private void scheduleHeartBeat(FiniteDuration interval) {
336 if(followerToActor.keySet().size() == 0){
337 // Optimization - do not bother scheduling a heartbeat as there are
344 // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
345 // message is sent to itself.
346 // Scheduling the heartbeat only once here because heartbeats do not
347 // need to be sent if there are other messages being sent to the remote
350 context.getActorSystem().scheduler().scheduleOnce(
352 context.getActor(), new SendHeartBeat(),
353 context.getActorSystem().dispatcher(), context.getActor());
357 private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
358 if(followerToActor.keySet().size() == 0){
359 // Optimization - do not bother scheduling a heartbeat as there are
364 stopInstallSnapshotSchedule();
366 // Schedule a message to send append entries to followers that can
367 // accept an append entries with some data in it
368 installSnapshotSchedule =
369 context.getActorSystem().scheduler().scheduleOnce(
371 context.getActor(), new SendInstallSnapshot(),
372 context.getActorSystem().dispatcher(), context.getActor());
377 @Override public void close() throws Exception {
381 @Override public String getLeaderId() {
382 return context.getId();