2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
16 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
17 import org.opendaylight.controller.cluster.raft.RaftActorContext;
18 import org.opendaylight.controller.cluster.raft.RaftState;
19 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
20 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
21 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
22 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
23 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
24 import scala.concurrent.duration.FiniteDuration;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.List;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicLong;
34 * The behavior of a RaftActor when it is in the Leader state
38 * <li> Upon election: send initial empty AppendEntries RPCs
39 * (heartbeat) to each server; repeat during idle periods to
40 * prevent election timeouts (§5.2)
41 * <li> If command received from client: append entry to local log,
42 * respond after entry applied to state machine (§5.3)
43 * <li> If last log index ≥ nextIndex for a follower: send
44 * AppendEntries RPC with log entries starting at nextIndex
46 * <li> If successful: update nextIndex and matchIndex for
48 * <li> If AppendEntries fails because of log inconsistency:
49 * decrement nextIndex and retry (§5.3)
51 * <li> If there exists an N such that N > commitIndex, a majority
52 * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
53 * set commitIndex = N (§5.3, §5.4).
55 public class Leader extends AbstractRaftActorBehavior {
58 * The interval at which a heart beat message will be sent to the remote
61 * Since this is set to 100 milliseconds the Election timeout should be
62 * at least 200 milliseconds
64 private static final FiniteDuration HEART_BEAT_INTERVAL =
65 new FiniteDuration(100, TimeUnit.MILLISECONDS);
67 private final Map<String, ActorRef> followerToReplicator = new HashMap<>();
69 private final Map<String, FollowerLogInformation> followerToLog =
72 private final Map<String, ActorSelection> followerToActor = new HashMap<>();
74 private Cancellable heartbeatCancel = null;
76 public Leader(RaftActorContext context, List<String> followers) {
79 for (String follower : followers) {
81 FollowerLogInformation followerLogInformation =
82 new FollowerLogInformationImpl(follower,
86 followerToActor.put(follower,
87 context.actorSelection(followerLogInformation.getId()));
88 followerToLog.put(follower, followerLogInformation);
92 // Immediately schedule a heartbeat
93 // Upon election: send initial empty AppendEntries RPCs
94 // (heartbeat) to each server; repeat during idle periods to
95 // prevent election timeouts (§5.2)
96 scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
101 @Override protected RaftState handleAppendEntries(ActorRef sender,
102 AppendEntries appendEntries, RaftState suggestedState) {
103 return suggestedState;
106 @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
107 AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
108 return suggestedState;
111 @Override protected RaftState handleRequestVote(ActorRef sender,
112 RequestVote requestVote, RaftState suggestedState) {
113 return suggestedState;
116 @Override protected RaftState handleRequestVoteReply(ActorRef sender,
117 RequestVoteReply requestVoteReply, RaftState suggestedState) {
118 return suggestedState;
121 @Override protected RaftState state() {
122 return RaftState.Leader;
125 @Override public RaftState handleMessage(ActorRef sender, Object message) {
126 Preconditions.checkNotNull(sender, "sender should not be null");
128 scheduleHeartBeat(HEART_BEAT_INTERVAL);
130 if (message instanceof SendHeartBeat) {
131 for (ActorSelection follower : followerToActor.values()) {
132 follower.tell(new AppendEntries(
133 context.getTermInformation().getCurrentTerm().get(),
135 context.getReplicatedLog().last().getIndex(),
136 context.getReplicatedLog().last().getTerm(),
137 Collections.EMPTY_LIST, context.getCommitIndex().get()),
142 return super.handleMessage(sender, message);
145 private void scheduleHeartBeat(FiniteDuration interval) {
146 if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) {
147 heartbeatCancel.cancel();
150 // Schedule a heartbeat. When the scheduler triggers the replicator
151 // will let the RaftActor (leader) know that a new heartbeat needs to be sent
152 // Scheduling the heartbeat only once here because heartbeats do not
153 // need to be sent if there are other messages being sent to the remote
156 context.getActorSystem().scheduler().scheduleOnce(interval,
157 context.getActor(), new SendHeartBeat(),
158 context.getActorSystem().dispatcher(), context.getActor());