72e8e8c2a70b50ed5022b392918bbadc58697eba
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Candidate.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import org.opendaylight.controller.cluster.raft.PeerInfo;
16 import org.opendaylight.controller.cluster.raft.RaftActorContext;
17 import org.opendaylight.controller.cluster.raft.RaftState;
18 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
19 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
20 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
21 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
22 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
23 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
24
25 /**
26  * The behavior of a RaftActor when it is in the CandidateState
27  * <p/>
28  * Candidates (§5.2):
29  * <ul>
30  * <li> On conversion to candidate, start election:
31  * <ul>
32  * <li> Increment currentTerm
33  * <li> Vote for self
34  * <li> Reset election timer
35  * <li> Send RequestVote RPCs to all other servers
36  * </ul>
37  * <li> If votes received from majority of servers: become leader
38  * <li> If AppendEntries RPC received from new leader: convert to
39  * follower
40  * <li> If election timeout elapses: start new election
41  * </ul>
42  */
43 public class Candidate extends AbstractRaftActorBehavior {
44
45     private int voteCount;
46
47     private final int votesRequired;
48
49     private final Collection<String> votingPeers = new ArrayList<>();
50
51     public Candidate(RaftActorContext context) {
52         super(context, RaftState.Candidate);
53
54         for(PeerInfo peer: context.getPeers()) {
55             if(peer.isVoting()) {
56                 votingPeers.add(peer.getId());
57             }
58         }
59
60         if(LOG.isDebugEnabled()) {
61             LOG.debug("{}: Election: Candidate has following voting peers: {}", logName(), votingPeers);
62         }
63
64         votesRequired = getMajorityVoteCount(votingPeers.size());
65
66         startNewTerm();
67
68         if(votingPeers.isEmpty()){
69             actor().tell(ElectionTimeout.INSTANCE, actor());
70         } else {
71             scheduleElection(electionDuration());
72         }
73
74
75     }
76
77     @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
78         AppendEntries appendEntries) {
79
80         if(LOG.isDebugEnabled()) {
81             LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
82         }
83
84         // Some other candidate for the same term became a leader and sent us an append entry
85         if(currentTerm() == appendEntries.getTerm()){
86             LOG.debug("{}: New Leader sent an append entry to Candidate for term {} will switch to Follower",
87                     logName(), currentTerm());
88
89             return switchBehavior(new Follower(context));
90         }
91
92         return this;
93     }
94
95     @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
96         AppendEntriesReply appendEntriesReply) {
97
98         return this;
99     }
100
101     @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
102             RequestVoteReply requestVoteReply) {
103
104         LOG.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply,
105                 voteCount);
106
107         if (requestVoteReply.isVoteGranted()) {
108             voteCount++;
109         }
110
111         if (voteCount >= votesRequired) {
112             return internalSwitchBehavior(RaftState.Leader);
113         }
114
115         return this;
116     }
117
118     @Override
119     public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
120
121         Object message = fromSerializableMessage(originalMessage);
122
123         if (message instanceof RaftRPC) {
124
125             RaftRPC rpc = (RaftRPC) message;
126
127             if(LOG.isDebugEnabled()) {
128                 LOG.debug("{}: RaftRPC message received {}, my term is {}", logName(), rpc,
129                         context.getTermInformation().getCurrentTerm());
130             }
131
132             // If RPC request or response contains term T > currentTerm:
133             // set currentTerm = T, convert to follower (§5.1)
134             // This applies to all RPC messages and responses
135             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
136                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
137
138                 return internalSwitchBehavior(RaftState.Follower);
139             }
140         }
141
142         if (message instanceof ElectionTimeout) {
143             LOG.debug("{}: Received ElectionTimeout", logName());
144
145             if (votesRequired == 0) {
146                 // If there are no peers then we should be a Leader
147                 // We wait for the election timeout to occur before declare
148                 // ourselves the leader. This gives enough time for a leader
149                 // who we do not know about (as a peer)
150                 // to send a message to the candidate
151
152                 return internalSwitchBehavior(RaftState.Leader);
153             }
154             startNewTerm();
155             scheduleElection(electionDuration());
156             return this;
157         }
158
159         return super.handleMessage(sender, message);
160     }
161
162
163     private void startNewTerm() {
164
165
166         // set voteCount back to 1 (that is voting for self)
167         voteCount = 1;
168
169         // Increment the election term and vote for self
170         long currentTerm = context.getTermInformation().getCurrentTerm();
171         long newTerm = currentTerm + 1;
172         context.getTermInformation().updateAndPersist(newTerm, context.getId());
173
174         LOG.debug("{}: Starting new term {}", logName(), newTerm);
175
176         // Request for a vote
177         // TODO: Retry request for vote if replies do not arrive in a reasonable
178         // amount of time TBD
179         for (String peerId : votingPeers) {
180             ActorSelection peerActor = context.getPeerActorSelection(peerId);
181             if(peerActor != null) {
182                 RequestVote requestVote = new RequestVote(
183                         context.getTermInformation().getCurrentTerm(),
184                         context.getId(),
185                         context.getReplicatedLog().lastIndex(),
186                         context.getReplicatedLog().lastTerm());
187
188                 LOG.debug("{}: Sending {} to peer {}", logName(), requestVote, peerId);
189
190                 peerActor.tell(requestVote, context.getActor());
191             }
192         }
193     }
194
195     @Override
196     public void close() {
197         stopElection();
198     }
199 }