Properly handle RequestVote in all states
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
16 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
17 import org.opendaylight.controller.cluster.raft.RaftActorContext;
18 import org.opendaylight.controller.cluster.raft.RaftState;
19 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
20 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
21 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
22 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
23 import scala.concurrent.duration.FiniteDuration;
24
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicLong;
31
32 /**
33  * The behavior of a RaftActor when it is in the Leader state
34  * <p/>
35  * Leaders:
36  * <ul>
37  * <li> Upon election: send initial empty AppendEntries RPCs
38  * (heartbeat) to each server; repeat during idle periods to
39  * prevent election timeouts (§5.2)
40  * <li> If command received from client: append entry to local log,
41  * respond after entry applied to state machine (§5.3)
42  * <li> If last log index ≥ nextIndex for a follower: send
43  * AppendEntries RPC with log entries starting at nextIndex
44  * <ul>
45  * <li> If successful: update nextIndex and matchIndex for
46  * follower (§5.3)
47  * <li> If AppendEntries fails because of log inconsistency:
48  * decrement nextIndex and retry (§5.3)
49  * </ul>
50  * <li> If there exists an N such that N > commitIndex, a majority
51  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
52  * set commitIndex = N (§5.3, §5.4).
53  */
54 public class Leader extends AbstractRaftActorBehavior {
55
56     /**
57      * The interval at which a heart beat message will be sent to the remote
58      * RaftActor
59      * <p/>
60      * Since this is set to 100 milliseconds the Election timeout should be
61      * at least 200 milliseconds
62      */
63     public static final FiniteDuration HEART_BEAT_INTERVAL =
64         new FiniteDuration(100, TimeUnit.MILLISECONDS);
65
66     private final Map<String, FollowerLogInformation> followerToLog =
67         new HashMap();
68
69     private final Map<String, ActorSelection> followerToActor = new HashMap<>();
70
71     private Cancellable heartbeatCancel = null;
72
73     public Leader(RaftActorContext context, List<String> followePaths) {
74         super(context);
75
76         for (String followerPath : followePaths) {
77             FollowerLogInformation followerLogInformation =
78                 new FollowerLogInformationImpl(followerPath,
79                     new AtomicLong(0),
80                     new AtomicLong(0));
81
82             followerToActor.put(followerPath,
83                 context.actorSelection(followerLogInformation.getId()));
84             followerToLog.put(followerPath, followerLogInformation);
85
86         }
87
88         // Immediately schedule a heartbeat
89         // Upon election: send initial empty AppendEntries RPCs
90         // (heartbeat) to each server; repeat during idle periods to
91         // prevent election timeouts (§5.2)
92         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
93
94
95     }
96
97     @Override protected RaftState handleAppendEntries(ActorRef sender,
98         AppendEntries appendEntries, RaftState suggestedState) {
99         return suggestedState;
100     }
101
102     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
103         AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
104         return suggestedState;
105     }
106
107     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
108         RequestVoteReply requestVoteReply, RaftState suggestedState) {
109         return suggestedState;
110     }
111
112     @Override protected RaftState state() {
113         return RaftState.Leader;
114     }
115
116     @Override public RaftState handleMessage(ActorRef sender, Object message) {
117         Preconditions.checkNotNull(sender, "sender should not be null");
118
119         scheduleHeartBeat(HEART_BEAT_INTERVAL);
120
121         if (message instanceof SendHeartBeat) {
122             for (ActorSelection follower : followerToActor.values()) {
123                 follower.tell(new AppendEntries(
124                     context.getTermInformation().getCurrentTerm(),
125                     context.getId(),
126                     context.getReplicatedLog().last().getIndex(),
127                     context.getReplicatedLog().last().getTerm(),
128                     Collections.EMPTY_LIST, context.getCommitIndex()),
129                     context.getActor());
130             }
131             return state();
132         }
133         return super.handleMessage(sender, message);
134     }
135
136     private void scheduleHeartBeat(FiniteDuration interval) {
137         if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) {
138             heartbeatCancel.cancel();
139         }
140
141         // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
142         // message is sent to itself.
143         // Scheduling the heartbeat only once here because heartbeats do not
144         // need to be sent if there are other messages being sent to the remote
145         // actor.
146         heartbeatCancel =
147             context.getActorSystem().scheduler().scheduleOnce(interval,
148                 context.getActor(), new SendHeartBeat(),
149                 context.getActorSystem().dispatcher(), context.getActor());
150     }
151
152 }