atomic-storage: remove type dependency at segment level I/O
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Candidate.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import org.opendaylight.controller.cluster.raft.PeerInfo;
16 import org.opendaylight.controller.cluster.raft.RaftActorContext;
17 import org.opendaylight.controller.cluster.raft.RaftState;
18 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
19 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
20 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
21 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
22 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
23 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
24 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
25 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
26 import scala.concurrent.duration.FiniteDuration;
27
28 /**
29  * The behavior of a RaftActor when it is in the Candidate raft state.
30  *
31  * <p>
32  * Candidates (§5.2):
33  * <ul>
34  * <li> On conversion to candidate, start election:
35  * <ul>
36  * <li> Increment currentTerm
37  * <li> Vote for self
38  * <li> Reset election timer
39  * <li> Send RequestVote RPCs to all other servers
40  * </ul>
41  * <li> If votes received from majority of servers: become leader
42  * <li> If AppendEntries RPC received from new leader: convert to
43  * follower
44  * <li> If election timeout elapses: start new election
45  * </ul>
46  */
47 public class Candidate extends AbstractRaftActorBehavior {
48
49     private int voteCount;
50
51     private final int votesRequired;
52
53     private final Collection<String> votingPeers = new ArrayList<>();
54
55     public Candidate(final RaftActorContext context) {
56         super(context, RaftState.Candidate);
57
58         for (PeerInfo peer: context.getPeers()) {
59             if (peer.isVoting()) {
60                 votingPeers.add(peer.getId());
61             }
62         }
63
64         log.debug("{}: Election: Candidate has following voting peers: {}", logName(), votingPeers);
65
66         votesRequired = getMajorityVoteCount(votingPeers.size());
67
68         startNewTerm();
69
70         if (votingPeers.isEmpty()) {
71             actor().tell(ElectionTimeout.INSTANCE, actor());
72         } else {
73             scheduleElection(electionDuration());
74         }
75     }
76
77     @Override
78     public final String getLeaderId() {
79         return null;
80     }
81
82     @Override
83     public final short getLeaderPayloadVersion() {
84         return -1;
85     }
86
87     @Override
88     protected RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) {
89
90         log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
91
92         // Some other candidate for the same term became a leader and sent us an append entry
93         if (currentTerm() == appendEntries.getTerm()) {
94             log.info("{}: New Leader {} sent an AppendEntries to Candidate for term {} - will switch to Follower",
95                     logName(), appendEntries.getLeaderId(), currentTerm());
96
97             return switchBehavior(new Follower(context));
98         }
99
100         return this;
101     }
102
103     @Override
104     protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
105             final AppendEntriesReply appendEntriesReply) {
106         return this;
107     }
108
109     @Override
110     protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) {
111         log.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount);
112
113         if (requestVoteReply.isVoteGranted()) {
114             voteCount++;
115         }
116
117         if (voteCount >= votesRequired) {
118             if (context.getLastApplied() < context.getReplicatedLog().lastIndex()) {
119                 log.info("{}: LastApplied index {} is behind last index {} - switching to PreLeader",
120                         logName(), context.getLastApplied(), context.getReplicatedLog().lastIndex());
121                 return internalSwitchBehavior(RaftState.PreLeader);
122             } else {
123                 return internalSwitchBehavior(RaftState.Leader);
124             }
125         }
126
127         return this;
128     }
129
130     @Override
131     protected FiniteDuration electionDuration() {
132         return super.electionDuration().$div(context.getConfigParams().getCandidateElectionTimeoutDivisor());
133     }
134
135
136     @Override
137     final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) {
138         throw new IllegalStateException("A candidate should never attempt to apply " + entry);
139     }
140
141     @Override
142     public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
143         if (message instanceof ElectionTimeout) {
144             log.debug("{}: Received ElectionTimeout", logName());
145
146             if (votesRequired == 0) {
147                 // If there are no peers then we should be a Leader
148                 // We wait for the election timeout to occur before declare
149                 // ourselves the leader. This gives enough time for a leader
150                 // who we do not know about (as a peer)
151                 // to send a message to the candidate
152
153                 return internalSwitchBehavior(RaftState.Leader);
154             }
155
156             startNewTerm();
157             scheduleElection(electionDuration());
158             return this;
159         }
160
161         if (message instanceof RaftRPC) {
162
163             RaftRPC rpc = (RaftRPC) message;
164
165             log.debug("{}: RaftRPC message received {}, my term is {}", logName(), rpc,
166                         context.getTermInformation().getCurrentTerm());
167
168             // If RPC request or response contains term T > currentTerm:
169             // set currentTerm = T, convert to follower (§5.1)
170             // This applies to all RPC messages and responses
171             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
172                 log.info("{}: Term {} in \"{}\" message is greater than Candidate's term {} - switching to Follower",
173                         logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
174
175                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
176
177                 // The raft paper does not say whether or not a Candidate can/should process a RequestVote in
178                 // this case but doing so gains quicker convergence when the sender's log is more up-to-date.
179                 if (message instanceof RequestVote) {
180                     super.handleMessage(sender, message);
181                 }
182
183                 return internalSwitchBehavior(RaftState.Follower);
184             }
185         }
186
187         return super.handleMessage(sender, message);
188     }
189
190     private void startNewTerm() {
191         // set voteCount back to 1 (that is voting for self)
192         voteCount = 1;
193
194         // Increment the election term and vote for self
195         long currentTerm = context.getTermInformation().getCurrentTerm();
196         long newTerm = currentTerm + 1;
197         context.getTermInformation().updateAndPersist(newTerm, context.getId());
198
199         log.info("{}: Starting new election term {}", logName(), newTerm);
200
201         // Request for a vote
202         // TODO: Retry request for vote if replies do not arrive in a reasonable
203         // amount of time TBD
204         for (String peerId : votingPeers) {
205             ActorSelection peerActor = context.getPeerActorSelection(peerId);
206             if (peerActor != null) {
207                 RequestVote requestVote = new RequestVote(
208                         context.getTermInformation().getCurrentTerm(),
209                         context.getId(),
210                         context.getReplicatedLog().lastIndex(),
211                         context.getReplicatedLog().lastTerm());
212
213                 log.debug("{}: Sending {} to peer {}", logName(), requestVote, peerId);
214
215                 peerActor.tell(requestVote, context.getActor());
216             }
217         }
218     }
219
220     @Override
221     public void close() {
222         stopElection();
223     }
224 }