Merge "BUG-190 Simplify reconnect logic in protocol-framework."
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Candidate.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import org.opendaylight.controller.cluster.raft.RaftActorContext;
14 import org.opendaylight.controller.cluster.raft.RaftState;
15 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
16 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
17 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
18 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
19 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
20 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
21
22 import java.util.Set;
23
24 /**
25  * The behavior of a RaftActor when it is in the CandidateState
26  * <p/>
27  * Candidates (§5.2):
28  * <ul>
29  * <li> On conversion to candidate, start election:
30  * <ul>
31  * <li> Increment currentTerm
32  * <li> Vote for self
33  * <li> Reset election timer
34  * <li> Send RequestVote RPCs to all other servers
35  * </ul>
36  * <li> If votes received from majority of servers: become leader
37  * <li> If AppendEntries RPC received from new leader: convert to
38  * follower
39  * <li> If election timeout elapses: start new election
40  * </ul>
41  */
42 public class Candidate extends AbstractRaftActorBehavior {
43
44     private int voteCount;
45
46     private final int votesRequired;
47
48     private final Set<String> peers;
49
50     public Candidate(RaftActorContext context) {
51         super(context);
52
53         peers = context.getPeerAddresses().keySet();
54
55         context.getLogger().debug("Election:Candidate has following peers:"+ peers);
56
57         if(peers.size() > 0) {
58             // Votes are required from a majority of the peers including self.
59             // The votesRequired field therefore stores a calculated value
60             // of the number of votes required for this candidate to win an
61             // election based on it's known peers.
62             // If a peer was added during normal operation and raft replicas
63             // came to know about them then the new peer would also need to be
64             // taken into consideration when calculating this value.
65             // Here are some examples for what the votesRequired would be for n
66             // peers
67             // 0 peers = 1 votesRequired (0 + 1) / 2 + 1 = 1
68             // 2 peers = 2 votesRequired (2 + 1) / 2 + 1 = 2
69             // 4 peers = 3 votesRequired (4 + 1) / 2 + 1 = 3
70             int noOfPeers = peers.size();
71             int self = 1;
72             votesRequired = (noOfPeers + self) / 2 + 1;
73         } else {
74             votesRequired = 0;
75         }
76
77         startNewTerm();
78         scheduleElection(electionDuration());
79     }
80
81     @Override protected RaftState handleAppendEntries(ActorRef sender,
82         AppendEntries appendEntries) {
83
84         context.getLogger().info("Candidate: Received {}", appendEntries.toString());
85
86         return state();
87     }
88
89     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
90         AppendEntriesReply appendEntriesReply) {
91
92         return state();
93     }
94
95     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
96         RequestVoteReply requestVoteReply) {
97
98         if (requestVoteReply.isVoteGranted()) {
99             voteCount++;
100         }
101
102         if (voteCount >= votesRequired) {
103             return RaftState.Leader;
104         }
105
106         return state();
107     }
108
109     @Override public RaftState state() {
110         return RaftState.Candidate;
111     }
112
113     @Override
114     public RaftState handleMessage(ActorRef sender, Object originalMessage) {
115
116         Object message = fromSerializableMessage(originalMessage);
117
118         if (message instanceof RaftRPC) {
119
120             RaftRPC rpc = (RaftRPC) message;
121
122             context.getLogger().debug("RaftRPC message received {} my term is {}", rpc.toString(), context.getTermInformation().getCurrentTerm());
123
124             // If RPC request or response contains term T > currentTerm:
125             // set currentTerm = T, convert to follower (§5.1)
126             // This applies to all RPC messages and responses
127             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
128                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
129                 return RaftState.Follower;
130             }
131         }
132
133         if (message instanceof ElectionTimeout) {
134             if (votesRequired == 0) {
135                 // If there are no peers then we should be a Leader
136                 // We wait for the election timeout to occur before declare
137                 // ourselves the leader. This gives enough time for a leader
138                 // who we do not know about (as a peer)
139                 // to send a message to the candidate
140                 return RaftState.Leader;
141             }
142             startNewTerm();
143             scheduleElection(electionDuration());
144             return state();
145         }
146
147         return super.handleMessage(sender, message);
148     }
149
150
151     private void startNewTerm() {
152
153
154         // set voteCount back to 1 (that is voting for self)
155         voteCount = 1;
156
157         // Increment the election term and vote for self
158         long currentTerm = context.getTermInformation().getCurrentTerm();
159         context.getTermInformation().updateAndPersist(currentTerm + 1,
160             context.getId());
161
162         context.getLogger().debug("Starting new term " + (currentTerm + 1));
163
164         // Request for a vote
165         // TODO: Retry request for vote if replies do not arrive in a reasonable
166         // amount of time TBD
167         for (String peerId : peers) {
168             ActorSelection peerActor = context.getPeerActorSelection(peerId);
169             if(peerActor != null) {
170                 peerActor.tell(new RequestVote(
171                         context.getTermInformation().getCurrentTerm(),
172                         context.getId(),
173                         context.getReplicatedLog().lastIndex(),
174                         context.getReplicatedLog().lastTerm()),
175                     context.getActor()
176                 );
177             }
178         }
179
180
181     }
182
183     @Override public void close() throws Exception {
184         stopElection();
185     }
186 }