2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
16 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
17 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
19 import org.opendaylight.controller.cluster.raft.RaftActorContext;
20 import org.opendaylight.controller.cluster.raft.RaftState;
21 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
22 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
23 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
24 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
25 import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
29 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
30 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
31 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
32 import scala.concurrent.duration.FiniteDuration;
34 import java.util.ArrayList;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.List;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicLong;
43 * The behavior of a RaftActor when it is in the Leader state
47 * <li> Upon election: send initial empty AppendEntries RPCs
48 * (heartbeat) to each server; repeat during idle periods to
49 * prevent election timeouts (§5.2)
50 * <li> If command received from client: append entry to local log,
51 * respond after entry applied to state machine (§5.3)
52 * <li> If last log index ≥ nextIndex for a follower: send
53 * AppendEntries RPC with log entries starting at nextIndex
55 * <li> If successful: update nextIndex and matchIndex for
57 * <li> If AppendEntries fails because of log inconsistency:
58 * decrement nextIndex and retry (§5.3)
60 * <li> If there exists an N such that N > commitIndex, a majority
61 * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
62 * set commitIndex = N (§5.3, §5.4).
64 public class Leader extends AbstractRaftActorBehavior {
67 private final Map<String, FollowerLogInformation> followerToLog =
70 private final Map<String, ActorSelection> followerToActor = new HashMap<>();
72 private Cancellable heartbeatSchedule = null;
73 private Cancellable appendEntriesSchedule = null;
74 private Cancellable installSnapshotSchedule = null;
76 private List<ClientRequestTracker> trackerList = new ArrayList<>();
78 private final int minReplicationCount;
80 public Leader(RaftActorContext context) {
83 if (lastIndex() >= 0) {
84 context.setCommitIndex(lastIndex());
87 for (String followerId : context.getPeerAddresses().keySet()) {
88 FollowerLogInformation followerLogInformation =
89 new FollowerLogInformationImpl(followerId,
90 new AtomicLong(lastIndex()),
93 followerToActor.put(followerId,
94 context.actorSelection(context.getPeerAddress(followerId)));
96 followerToLog.put(followerId, followerLogInformation);
99 context.getLogger().debug("Election:Leader has following peers:"+followerToActor.keySet());
101 if (followerToActor.size() > 0) {
102 minReplicationCount = (followerToActor.size() + 1) / 2 + 1;
104 minReplicationCount = 0;
108 // Immediately schedule a heartbeat
109 // Upon election: send initial empty AppendEntries RPCs
110 // (heartbeat) to each server; repeat during idle periods to
111 // prevent election timeouts (§5.2)
112 scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
114 scheduleInstallSnapshotCheck(
115 new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000,
116 HEART_BEAT_INTERVAL.unit())
121 @Override protected RaftState handleAppendEntries(ActorRef sender,
122 AppendEntries appendEntries) {
127 @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
128 AppendEntriesReply appendEntriesReply) {
130 // Update the FollowerLogInformation
131 String followerId = appendEntriesReply.getFollowerId();
132 FollowerLogInformation followerLogInformation =
133 followerToLog.get(followerId);
134 if (appendEntriesReply.isSuccess()) {
135 followerLogInformation
136 .setMatchIndex(appendEntriesReply.getLogLastIndex());
137 followerLogInformation
138 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
140 followerLogInformation.decrNextIndex();
143 // Now figure out if this reply warrants a change in the commitIndex
144 // If there exists an N such that N > commitIndex, a majority
145 // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
146 // set commitIndex = N (§5.3, §5.4).
147 for (long N = context.getCommitIndex() + 1; ; N++) {
148 int replicatedCount = 1;
150 for (FollowerLogInformation info : followerToLog.values()) {
151 if (info.getMatchIndex().get() >= N) {
156 if (replicatedCount >= minReplicationCount) {
157 ReplicatedLogEntry replicatedLogEntry =
158 context.getReplicatedLog().get(N);
159 if (replicatedLogEntry != null
160 && replicatedLogEntry.getTerm()
162 context.setCommitIndex(N);
169 // Apply the change to the state machine
170 if (context.getCommitIndex() > context.getLastApplied()) {
171 applyLogToStateMachine(context.getCommitIndex());
177 protected ClientRequestTracker findClientRequestTracker(long logIndex) {
178 for (ClientRequestTracker tracker : trackerList) {
179 if (tracker.getIndex() == logIndex) {
187 @Override protected RaftState handleRequestVoteReply(ActorRef sender,
188 RequestVoteReply requestVoteReply) {
192 @Override public RaftState state() {
193 return RaftState.Leader;
196 @Override public RaftState handleMessage(ActorRef sender, Object message) {
197 Preconditions.checkNotNull(sender, "sender should not be null");
199 if (message instanceof RaftRPC) {
200 RaftRPC rpc = (RaftRPC) message;
201 // If RPC request or response contains term T > currentTerm:
202 // set currentTerm = T, convert to follower (§5.1)
203 // This applies to all RPC messages and responses
204 if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
205 context.getTermInformation().update(rpc.getTerm(), null);
206 return RaftState.Follower;
211 if (message instanceof SendHeartBeat) {
212 return sendHeartBeat();
213 } else if(message instanceof SendInstallSnapshot) {
214 installSnapshotIfNeeded();
215 } else if (message instanceof Replicate) {
216 replicate((Replicate) message);
217 } else if (message instanceof InstallSnapshotReply){
218 // FIXME : Should I be checking the term here too?
219 handleInstallSnapshotReply(
220 (InstallSnapshotReply) message);
223 scheduleHeartBeat(HEART_BEAT_INTERVAL);
226 return super.handleMessage(sender, message);
229 private void handleInstallSnapshotReply(InstallSnapshotReply message) {
230 InstallSnapshotReply reply = message;
231 String followerId = reply.getFollowerId();
232 FollowerLogInformation followerLogInformation =
233 followerToLog.get(followerId);
235 followerLogInformation
236 .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
237 followerLogInformation
238 .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
241 private void replicate(Replicate replicate) {
242 long logIndex = replicate.getReplicatedLogEntry().getIndex();
244 context.getLogger().debug("Replicate message " + logIndex);
246 if (followerToActor.size() == 0) {
247 context.setCommitIndex(
248 replicate.getReplicatedLogEntry().getIndex());
251 .tell(new ApplyState(replicate.getClientActor(),
252 replicate.getIdentifier(),
253 replicate.getReplicatedLogEntry()),
258 // Create a tracker entry we will use this later to notify the
261 new ClientRequestTrackerImpl(replicate.getClientActor(),
262 replicate.getIdentifier(),
270 private void sendAppendEntries() {
271 // Send an AppendEntries to all followers
272 for (String followerId : followerToActor.keySet()) {
273 ActorSelection followerActor =
274 followerToActor.get(followerId);
276 FollowerLogInformation followerLogInformation =
277 followerToLog.get(followerId);
279 long nextIndex = followerLogInformation.getNextIndex().get();
281 List<ReplicatedLogEntry> entries = Collections.emptyList();
283 if(context.getReplicatedLog().isPresent(nextIndex)){
285 context.getReplicatedLog().getFrom(nextIndex);
289 new AppendEntries(currentTerm(), context.getId(),
290 prevLogIndex(nextIndex), prevLogTerm(nextIndex),
291 entries, context.getCommitIndex()
298 private void installSnapshotIfNeeded(){
299 for (String followerId : followerToActor.keySet()) {
300 ActorSelection followerActor =
301 followerToActor.get(followerId);
303 FollowerLogInformation followerLogInformation =
304 followerToLog.get(followerId);
306 long nextIndex = followerLogInformation.getNextIndex().get();
308 if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){
310 new InstallSnapshot(currentTerm(), context.getId(),
311 context.getReplicatedLog().getSnapshotIndex(),
312 context.getReplicatedLog().getSnapshotTerm(),
313 context.getReplicatedLog().getSnapshot()
321 private RaftState sendHeartBeat() {
322 if (followerToActor.size() > 0) {
328 private void stopHeartBeat() {
329 if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
330 heartbeatSchedule.cancel();
334 private void stopInstallSnapshotSchedule() {
335 if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
336 installSnapshotSchedule.cancel();
340 private void scheduleHeartBeat(FiniteDuration interval) {
341 if(followerToActor.keySet().size() == 0){
342 // Optimization - do not bother scheduling a heartbeat as there are
349 // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
350 // message is sent to itself.
351 // Scheduling the heartbeat only once here because heartbeats do not
352 // need to be sent if there are other messages being sent to the remote
355 context.getActorSystem().scheduler().scheduleOnce(
357 context.getActor(), new SendHeartBeat(),
358 context.getActorSystem().dispatcher(), context.getActor());
362 private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
363 if(followerToActor.keySet().size() == 0){
364 // Optimization - do not bother scheduling a heartbeat as there are
369 stopInstallSnapshotSchedule();
371 // Schedule a message to send append entries to followers that can
372 // accept an append entries with some data in it
373 installSnapshotSchedule =
374 context.getActorSystem().scheduler().scheduleOnce(
376 context.getActor(), new SendInstallSnapshot(),
377 context.getActorSystem().dispatcher(), context.getActor());
382 @Override public void close() throws Exception {
386 @Override public String getLeaderId() {
387 return context.getId();