import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import java.util.Set;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.raft.PeerInfo;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
private final int votesRequired;
- private final Set<String> peers;
+ private final Collection<String> votingPeers = new ArrayList<>();
public Candidate(RaftActorContext context) {
super(context, RaftState.Candidate);
- peers = context.getPeerAddresses().keySet();
+ for(PeerInfo peer: context.getPeers()) {
+ if(peer.isVoting()) {
+ votingPeers.add(peer.getId());
+ }
+ }
if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Election: Candidate has following peers: {}", logName(), peers);
+ LOG.debug("{}: Election: Candidate has following voting peers: {}", logName(), votingPeers);
}
- votesRequired = getMajorityVoteCount(peers.size());
+ votesRequired = getMajorityVoteCount(votingPeers.size());
startNewTerm();
- if(context.getPeerAddresses().isEmpty()){
- actor().tell(ELECTION_TIMEOUT, actor());
+ if(votingPeers.isEmpty()){
+ actor().tell(ElectionTimeout.INSTANCE, actor());
} else {
scheduleElection(electionDuration());
}
+ }
+ @Override
+ public final String getLeaderId() {
+ return null;
+ }
+ @Override
+ public final short getLeaderPayloadVersion() {
+ return -1;
}
- @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
+ @Override
+ protected RaftActorBehavior handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
if(LOG.isDebugEnabled()) {
LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
}
- return this;
- }
+ // Some other candidate for the same term became a leader and sent us an append entry
+ if(currentTerm() == appendEntries.getTerm()){
+ LOG.debug("{}: New Leader sent an append entry to Candidate for term {} will switch to Follower",
+ logName(), currentTerm());
- @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply) {
+ return switchBehavior(new Follower(context));
+ }
return this;
}
- @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
- RequestVoteReply requestVoteReply) {
+ @Override
+ protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
+ return this;
+ }
- LOG.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply,
- voteCount);
+ @Override
+ protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) {
+ LOG.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount);
if (requestVoteReply.isVoteGranted()) {
voteCount++;
}
if (voteCount >= votesRequired) {
- return switchBehavior(new Leader(context));
+ return internalSwitchBehavior(RaftState.Leader);
}
return this;
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
- return switchBehavior(new Follower(context));
+ return internalSwitchBehavior(RaftState.Follower);
}
}
// who we do not know about (as a peer)
// to send a message to the candidate
- return switchBehavior(new Leader(context));
+ return internalSwitchBehavior(RaftState.Leader);
}
startNewTerm();
scheduleElection(electionDuration());
// Request for a vote
// TODO: Retry request for vote if replies do not arrive in a reasonable
// amount of time TBD
- for (String peerId : peers) {
+ for (String peerId : votingPeers) {
ActorSelection peerActor = context.getPeerActorSelection(peerId);
if(peerActor != null) {
RequestVote requestVote = new RequestVote(
}
}
- @Override public void close() throws Exception {
+ @Override
+ public void close() {
stopElection();
}
}