package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import scala.concurrent.duration.FiniteDuration;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
* The behavior of a RaftActor when it is in the CandidateState
* </ul>
*/
public class Candidate extends AbstractRaftActorBehavior {
- private final List<String> peers;
- public Candidate(RaftActorContext context, List<String> peers) {
+ /**
+ * The maximum election time variance
+ */
+ private static final int ELECTION_TIME_MAX_VARIANCE = 100;
+
+ /**
+ * The interval in which a new election would get triggered if no leader is found
+ */
+ private static final long ELECTION_TIME_INTERVAL = Leader.HEART_BEAT_INTERVAL.toMillis() * 2;
+
+ /**
+ *
+ */
+ private final Map<String, ActorSelection> peerToActor = new HashMap<>();
+
+ private Cancellable electionCancel = null;
+
+ private int voteCount;
+
+ private final int votesRequired;
+
+ public Candidate(RaftActorContext context, List<String> peerPaths) {
super(context);
- this.peers = peers;
+
+ for (String peerPath : peerPaths) {
+ peerToActor.put(peerPath,
+ context.actorSelection(peerPath));
+ }
+
+ if(peerPaths.size() > 0) {
+ // Votes are required from a majority of the peers including self.
+ // The votesRequired field therefore stores a calculated value
+ // of the number of votes required for this candidate to win an
+ // election based on it's known peers.
+ // If a peer was added during normal operation and raft replicas
+ // came to know about them then the new peer would also need to be
+ // taken into consideration when calculating this value.
+ // Here are some examples for what the votesRequired would be for n
+ // peers
+ // 0 peers = 1 votesRequired (0 + 1) / 2 + 1 = 1
+ // 2 peers = 2 votesRequired (2 + 1) / 2 + 1 = 2
+ // 4 peers = 3 votesRequired (4 + 1) / 2 + 1 = 3
+ int noOfPeers = peerPaths.size();
+ int self = 1;
+ votesRequired = (noOfPeers + self) / 2 + 1;
+ } else {
+ votesRequired = 0;
+ }
+
+ scheduleElection(randomizedDuration());
}
@Override protected RaftState handleAppendEntries(ActorRef sender,
AppendEntries appendEntries, RaftState suggestedState) {
+
+ // There is some peer who thinks it's a leader but is not
+ // I will not accept this append entries
+ sender.tell(new AppendEntriesReply(
+ context.getTermInformation().getCurrentTerm().get(), false),
+ context.getActor());
+
return suggestedState;
}
@Override protected RaftState handleAppendEntriesReply(ActorRef sender,
AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
+
+ // Some peer thinks I was a leader and sent me a reply
+
return suggestedState;
}
@Override protected RaftState handleRequestVote(ActorRef sender,
RequestVote requestVote, RaftState suggestedState) {
+
+ // We got this RequestVote because the term in there is less than
+ // or equal to our current term, so do not grant the vote
+ sender.tell(new RequestVoteReply(
+ context.getTermInformation().getCurrentTerm().get(), false),
+ context.getActor());
+
return suggestedState;
}
@Override protected RaftState handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply, RaftState suggestedState) {
- return suggestedState;
+ if(suggestedState == RaftState.Follower) {
+ // If base class thinks I should be follower then I am
+ return suggestedState;
+ }
+
+ if(requestVoteReply.isVoteGranted()){
+ voteCount++;
+ }
+
+ if(voteCount >= votesRequired){
+ return RaftState.Leader;
+ }
+
+ return state();
}
@Override protected RaftState state() {
@Override
public RaftState handleMessage(ActorRef sender, Object message) {
+ if(message instanceof ElectionTimeout){
+ if(votesRequired == 0){
+ // If there are no peers then we should be a Leader
+ // We wait for the election timeout to occur before declare
+ // ourselves the leader. This gives enough time for a leader
+ // who we do not know about (as a peer)
+ // to send a message to the candidate
+ return RaftState.Leader;
+ }
+ scheduleElection(randomizedDuration());
+ return state();
+ }
return super.handleMessage(sender, message);
}
+
+ private FiniteDuration randomizedDuration(){
+ long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
+ return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS);
+ }
+
+ private void scheduleElection(FiniteDuration interval) {
+
+ // set voteCount back to 1 (that is voting for self)
+ voteCount = 1;
+
+ // Increment the election term and vote for self
+ AtomicLong currentTerm = context.getTermInformation().getCurrentTerm();
+ context.getTermInformation().update(currentTerm.incrementAndGet(), context.getId());
+
+ // Request for a vote
+ for(ActorSelection peerActor : peerToActor.values()){
+ peerActor.tell(new RequestVote(
+ context.getTermInformation().getCurrentTerm().get(),
+ context.getId(), context.getReplicatedLog().last().getIndex(),
+ context.getReplicatedLog().last().getTerm()),
+ context.getActor());
+ }
+
+ if (electionCancel != null && !electionCancel.isCancelled()) {
+ electionCancel.cancel();
+ }
+
+ // Schedule an election. When the scheduler triggers an ElectionTimeout
+ // message is sent to itself
+ electionCancel =
+ context.getActorSystem().scheduler().scheduleOnce(interval,
+ context.getActor(), new ElectionTimeout(),
+ context.getActorSystem().dispatcher(), context.getActor());
+ }
+
}
* Since this is set to 100 milliseconds the Election timeout should be
* at least 200 milliseconds
*/
- private static final FiniteDuration HEART_BEAT_INTERVAL =
+ public static final FiniteDuration HEART_BEAT_INTERVAL =
new FiniteDuration(100, TimeUnit.MILLISECONDS);
- private final Map<String, ActorRef> followerToReplicator = new HashMap<>();
-
private final Map<String, FollowerLogInformation> followerToLog =
new HashMap();
private Cancellable heartbeatCancel = null;
- public Leader(RaftActorContext context, List<String> followers) {
+ public Leader(RaftActorContext context, List<String> followePaths) {
super(context);
- for (String follower : followers) {
-
+ for (String followerPath : followePaths) {
FollowerLogInformation followerLogInformation =
- new FollowerLogInformationImpl(follower,
+ new FollowerLogInformationImpl(followerPath,
new AtomicLong(0),
new AtomicLong(0));
- followerToActor.put(follower,
+ followerToActor.put(followerPath,
context.actorSelection(followerLogInformation.getId()));
- followerToLog.put(follower, followerLogInformation);
+ followerToLog.put(followerPath, followerLogInformation);
}
heartbeatCancel.cancel();
}
- // Schedule a heartbeat. When the scheduler triggers the replicator
- // will let the RaftActor (leader) know that a new heartbeat needs to be sent
+ // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
+ // message is sent to itself.
// Scheduling the heartbeat only once here because heartbeats do not
// need to be sent if there are other messages being sent to the remote
// actor.
package org.opendaylight.controller.cluster.raft.internal.messages;
-/**
- * Sent to a replicator when log entries need to be replicated to other
- * members in the cluster
- */
-public class Replicate {
+public class ElectionTimeout {
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.internal.messages;
-
-/**
- * Sent by a RaftReplicator to the RaftActor when it has successfully
- * replicated an entry to a remote RaftActor
- */
-public class EntryReplicated {
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.internal.messages;
-
-/**
- * Sent to a replicator when log entries need to be replicated to other
- * members in the cluster
- */
-public class ReplicateEntry {
-}
private ActorRef actor;
private AtomicLong index = new AtomicLong(0);
private AtomicLong lastApplied = new AtomicLong(0);
+ private final ElectionTerm electionTerm;
public MockRaftActorContext(){
-
+ electionTerm = null;
}
public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
this.id = id;
this.system = system;
this.actor = actor;
+
+ electionTerm = new ElectionTermImpl(id);
}
@Override public ActorRef actorOf(Props props) {
}
@Override public ElectionTerm getTermInformation() {
- return new ElectionTermImpl(this.id);
+ return electionTerm;
}
public void setIndex(AtomicLong index){
package org.opendaylight.controller.cluster.raft.behaviors;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import java.util.Arrays;
import java.util.Collections;
+import static org.junit.Assert.assertEquals;
+
public class CandidateTest extends AbstractRaftActorBehaviorTest {
+ private final ActorRef candidateActor = getSystem().actorOf(Props.create(
+ DoNothingActor.class));
+
+ private final ActorRef peerActor1 = getSystem().actorOf(Props.create(
+ DoNothingActor.class));
+
+ private final ActorRef peerActor2 = getSystem().actorOf(Props.create(
+ DoNothingActor.class));
+
+ private final ActorRef peerActor3 = getSystem().actorOf(Props.create(
+ DoNothingActor.class));
+
+ private final ActorRef peerActor4 = getSystem().actorOf(Props.create(
+ DoNothingActor.class));
+
+ @Test
+ public void testWhenACandidateIsCreatedItIncrementsTheCurrentTermAndVotesForItself(){
+ RaftActorContext raftActorContext = createActorContext();
+ long expectedTerm = raftActorContext.getTermInformation().getCurrentTerm().get();
+
+ new Candidate(raftActorContext, Collections.EMPTY_LIST);
+
+ assertEquals(expectedTerm+1, raftActorContext.getTermInformation().getCurrentTerm().get());
+ assertEquals(raftActorContext.getId(), raftActorContext.getTermInformation().getVotedFor());
+ }
+
+ @Test
+ public void testThatAnElectionTimeoutIsTriggered(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ Candidate candidate = new Candidate(createActorContext(getTestActor()), Collections.EMPTY_LIST);
+
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "ElectionTimeout") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof ElectionTimeout) {
+ return true;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(true, out);
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testHandleElectionTimeoutWhenThereAreZeroPeers(){
+ RaftActorContext raftActorContext = createActorContext();
+ Candidate candidate =
+ new Candidate(raftActorContext, Collections.EMPTY_LIST);
+
+ RaftState raftState =
+ candidate.handleMessage(candidateActor, new ElectionTimeout());
+
+ Assert.assertEquals(RaftState.Leader, raftState);
+ }
+
+ @Test
+ public void testHandleElectionTimeoutWhenThereAreTwoPeers(){
+ RaftActorContext raftActorContext = createActorContext();
+ Candidate candidate =
+ new Candidate(raftActorContext, Arrays
+ .asList(peerActor1.path().toString(),
+ peerActor2.path().toString()));
+
+ RaftState raftState =
+ candidate.handleMessage(candidateActor, new ElectionTimeout());
+
+ Assert.assertEquals(RaftState.Candidate, raftState);
+ }
+
+ @Test
+ public void testBecomeLeaderOnReceivingMajorityVotesInThreePeerCluster(){
+ RaftActorContext raftActorContext = createActorContext();
+ Candidate candidate =
+ new Candidate(raftActorContext, Arrays
+ .asList(peerActor1.path().toString(),
+ peerActor2.path().toString()));
+
+ RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
+
+ Assert.assertEquals(RaftState.Leader, stateOnFirstVote);
+
+ }
+
+ @Test
+ public void testBecomeLeaderOnReceivingMajorityVotesInFivePeerCluster(){
+ RaftActorContext raftActorContext = createActorContext();
+ Candidate candidate =
+ new Candidate(raftActorContext, Arrays
+ .asList(peerActor1.path().toString(),
+ peerActor2.path().toString(),
+ peerActor3.path().toString()));
+
+ RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
+
+ RaftState stateOnSecondVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
+
+ Assert.assertEquals(RaftState.Candidate, stateOnFirstVote);
+ Assert.assertEquals(RaftState.Leader, stateOnSecondVote);
+
+ }
+
+ @Test
+ public void testResponseToAppendEntriesWithLowerTerm(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ Candidate candidate = new Candidate(createActorContext(getTestActor()), Collections.EMPTY_LIST);
+
+ candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.EMPTY_LIST, 0));
+
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "AppendEntriesResponse") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof AppendEntriesReply) {
+ AppendEntriesReply reply = (AppendEntriesReply) in;
+ return reply.isSuccess();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(false, out);
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testResponseToRequestVoteWithLowerTerm(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ Candidate candidate = new Candidate(createActorContext(getTestActor()), Collections.EMPTY_LIST);
+
+ candidate.handleMessage(getTestActor(), new RequestVote(0, "test", 0, 0));
+
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "AppendEntriesResponse") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply = (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(false, out);
+ }
+ };
+ }};
+ }
+
+
+
@Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
return new Candidate(actorContext, Collections.EMPTY_LIST);
}
+
+ @Override protected RaftActorContext createActorContext() {
+ return new MockRaftActorContext("test", getSystem(), candidateActor);
+ }
+
+ protected RaftActorContext createActorContext(ActorRef candidateActor) {
+ return new MockRaftActorContext("test", getSystem(), candidateActor);
+ }
+
}
package org.opendaylight.controller.cluster.raft.behaviors;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
public class FollowerTest extends AbstractRaftActorBehaviorTest {
+
+ private final ActorRef followerActor = getSystem().actorOf(Props.create(
+ DoNothingActor.class));
+
+
@Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
return new Follower(actorContext);
}
+
+ @Override protected RaftActorContext createActorContext() {
+ return new MockRaftActorContext("test", getSystem(), followerActor);
+ }
+
}