From 97222f19035815199200e727f43960513073eb9e Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Fri, 11 Jul 2014 17:12:59 -0700 Subject: [PATCH] Complete Candidate behavior implementation Change-Id: I483ec7d0a3430e186decda7ac45258b47132694b Signed-off-by: Moiz Raja --- .../cluster/raft/behaviors/Candidate.java | 141 ++++++++++++- .../cluster/raft/behaviors/Leader.java | 19 +- .../{Replicate.java => ElectionTimeout.java} | 6 +- .../internal/messages/EntryReplicated.java | 16 -- .../internal/messages/ReplicateEntry.java | 16 -- .../cluster/raft/MockRaftActorContext.java | 7 +- .../cluster/raft/behaviors/CandidateTest.java | 195 ++++++++++++++++++ .../cluster/raft/behaviors/FollowerTest.java | 14 ++ 8 files changed, 360 insertions(+), 54 deletions(-) rename opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/{Replicate.java => ElectionTimeout.java} (74%) delete mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/EntryReplicated.java delete mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ReplicateEntry.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java index 774691154a..f61905e393 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -9,14 +9,23 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.Cancellable; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import scala.concurrent.duration.FiniteDuration; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * The behavior of a RaftActor when it is in the CandidateState @@ -37,31 +46,107 @@ import java.util.List; * */ public class Candidate extends AbstractRaftActorBehavior { - private final List peers; - public Candidate(RaftActorContext context, List peers) { + /** + * The maximum election time variance + */ + private static final int ELECTION_TIME_MAX_VARIANCE = 100; + + /** + * The interval in which a new election would get triggered if no leader is found + */ + private static final long ELECTION_TIME_INTERVAL = Leader.HEART_BEAT_INTERVAL.toMillis() * 2; + + /** + * + */ + private final Map peerToActor = new HashMap<>(); + + private Cancellable electionCancel = null; + + private int voteCount; + + private final int votesRequired; + + public Candidate(RaftActorContext context, List peerPaths) { super(context); - this.peers = peers; + + for (String peerPath : peerPaths) { + peerToActor.put(peerPath, + context.actorSelection(peerPath)); + } + + if(peerPaths.size() > 0) { + // Votes are required from a majority of the peers including self. + // The votesRequired field therefore stores a calculated value + // of the number of votes required for this candidate to win an + // election based on it's known peers. + // If a peer was added during normal operation and raft replicas + // came to know about them then the new peer would also need to be + // taken into consideration when calculating this value. + // Here are some examples for what the votesRequired would be for n + // peers + // 0 peers = 1 votesRequired (0 + 1) / 2 + 1 = 1 + // 2 peers = 2 votesRequired (2 + 1) / 2 + 1 = 2 + // 4 peers = 3 votesRequired (4 + 1) / 2 + 1 = 3 + int noOfPeers = peerPaths.size(); + int self = 1; + votesRequired = (noOfPeers + self) / 2 + 1; + } else { + votesRequired = 0; + } + + scheduleElection(randomizedDuration()); } @Override protected RaftState handleAppendEntries(ActorRef sender, AppendEntries appendEntries, RaftState suggestedState) { + + // There is some peer who thinks it's a leader but is not + // I will not accept this append entries + sender.tell(new AppendEntriesReply( + context.getTermInformation().getCurrentTerm().get(), false), + context.getActor()); + return suggestedState; } @Override protected RaftState handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply, RaftState suggestedState) { + + // Some peer thinks I was a leader and sent me a reply + return suggestedState; } @Override protected RaftState handleRequestVote(ActorRef sender, RequestVote requestVote, RaftState suggestedState) { + + // We got this RequestVote because the term in there is less than + // or equal to our current term, so do not grant the vote + sender.tell(new RequestVoteReply( + context.getTermInformation().getCurrentTerm().get(), false), + context.getActor()); + return suggestedState; } @Override protected RaftState handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply, RaftState suggestedState) { - return suggestedState; + if(suggestedState == RaftState.Follower) { + // If base class thinks I should be follower then I am + return suggestedState; + } + + if(requestVoteReply.isVoteGranted()){ + voteCount++; + } + + if(voteCount >= votesRequired){ + return RaftState.Leader; + } + + return state(); } @Override protected RaftState state() { @@ -70,6 +155,54 @@ public class Candidate extends AbstractRaftActorBehavior { @Override public RaftState handleMessage(ActorRef sender, Object message) { + if(message instanceof ElectionTimeout){ + if(votesRequired == 0){ + // If there are no peers then we should be a Leader + // We wait for the election timeout to occur before declare + // ourselves the leader. This gives enough time for a leader + // who we do not know about (as a peer) + // to send a message to the candidate + return RaftState.Leader; + } + scheduleElection(randomizedDuration()); + return state(); + } return super.handleMessage(sender, message); } + + private FiniteDuration randomizedDuration(){ + long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE); + return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS); + } + + private void scheduleElection(FiniteDuration interval) { + + // set voteCount back to 1 (that is voting for self) + voteCount = 1; + + // Increment the election term and vote for self + AtomicLong currentTerm = context.getTermInformation().getCurrentTerm(); + context.getTermInformation().update(currentTerm.incrementAndGet(), context.getId()); + + // Request for a vote + for(ActorSelection peerActor : peerToActor.values()){ + peerActor.tell(new RequestVote( + context.getTermInformation().getCurrentTerm().get(), + context.getId(), context.getReplicatedLog().last().getIndex(), + context.getReplicatedLog().last().getTerm()), + context.getActor()); + } + + if (electionCancel != null && !electionCancel.isCancelled()) { + electionCancel.cancel(); + } + + // Schedule an election. When the scheduler triggers an ElectionTimeout + // message is sent to itself + electionCancel = + context.getActorSystem().scheduler().scheduleOnce(interval, + context.getActor(), new ElectionTimeout(), + context.getActorSystem().dispatcher(), context.getActor()); + } + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 6c3eee5415..5c37455be9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -61,11 +61,9 @@ public class Leader extends AbstractRaftActorBehavior { * Since this is set to 100 milliseconds the Election timeout should be * at least 200 milliseconds */ - private static final FiniteDuration HEART_BEAT_INTERVAL = + public static final FiniteDuration HEART_BEAT_INTERVAL = new FiniteDuration(100, TimeUnit.MILLISECONDS); - private final Map followerToReplicator = new HashMap<>(); - private final Map followerToLog = new HashMap(); @@ -73,19 +71,18 @@ public class Leader extends AbstractRaftActorBehavior { private Cancellable heartbeatCancel = null; - public Leader(RaftActorContext context, List followers) { + public Leader(RaftActorContext context, List followePaths) { super(context); - for (String follower : followers) { - + for (String followerPath : followePaths) { FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(follower, + new FollowerLogInformationImpl(followerPath, new AtomicLong(0), new AtomicLong(0)); - followerToActor.put(follower, + followerToActor.put(followerPath, context.actorSelection(followerLogInformation.getId())); - followerToLog.put(follower, followerLogInformation); + followerToLog.put(followerPath, followerLogInformation); } @@ -147,8 +144,8 @@ public class Leader extends AbstractRaftActorBehavior { heartbeatCancel.cancel(); } - // Schedule a heartbeat. When the scheduler triggers the replicator - // will let the RaftActor (leader) know that a new heartbeat needs to be sent + // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat + // message is sent to itself. // Scheduling the heartbeat only once here because heartbeats do not // need to be sent if there are other messages being sent to the remote // actor. diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/Replicate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ElectionTimeout.java similarity index 74% rename from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/Replicate.java rename to opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ElectionTimeout.java index ee08bc1b41..0a4b8fa669 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/Replicate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ElectionTimeout.java @@ -8,9 +8,5 @@ package org.opendaylight.controller.cluster.raft.internal.messages; -/** - * Sent to a replicator when log entries need to be replicated to other - * members in the cluster - */ -public class Replicate { +public class ElectionTimeout { } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/EntryReplicated.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/EntryReplicated.java deleted file mode 100644 index da28024c2b..0000000000 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/EntryReplicated.java +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.raft.internal.messages; - -/** - * Sent by a RaftReplicator to the RaftActor when it has successfully - * replicated an entry to a remote RaftActor - */ -public class EntryReplicated { -} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ReplicateEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ReplicateEntry.java deleted file mode 100644 index 8fe235c4b9..0000000000 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ReplicateEntry.java +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.raft.internal.messages; - -/** - * Sent to a replicator when log entries need to be replicated to other - * members in the cluster - */ -public class ReplicateEntry { -} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index d39eeec411..c265388f6b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -22,15 +22,18 @@ public class MockRaftActorContext implements RaftActorContext { private ActorRef actor; private AtomicLong index = new AtomicLong(0); private AtomicLong lastApplied = new AtomicLong(0); + private final ElectionTerm electionTerm; public MockRaftActorContext(){ - + electionTerm = null; } public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){ this.id = id; this.system = system; this.actor = actor; + + electionTerm = new ElectionTermImpl(id); } @Override public ActorRef actorOf(Props props) { @@ -50,7 +53,7 @@ public class MockRaftActorContext implements RaftActorContext { } @Override public ElectionTerm getTermInformation() { - return new ElectionTermImpl(this.id); + return electionTerm; } public void setIndex(AtomicLong index){ diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java index 6a3040d843..9efdbd7c54 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java @@ -1,12 +1,207 @@ package org.opendaylight.controller.cluster.raft.behaviors; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import junit.framework.Assert; +import org.junit.Test; +import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; +import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; +import java.util.Arrays; import java.util.Collections; +import static org.junit.Assert.assertEquals; + public class CandidateTest extends AbstractRaftActorBehaviorTest { + private final ActorRef candidateActor = getSystem().actorOf(Props.create( + DoNothingActor.class)); + + private final ActorRef peerActor1 = getSystem().actorOf(Props.create( + DoNothingActor.class)); + + private final ActorRef peerActor2 = getSystem().actorOf(Props.create( + DoNothingActor.class)); + + private final ActorRef peerActor3 = getSystem().actorOf(Props.create( + DoNothingActor.class)); + + private final ActorRef peerActor4 = getSystem().actorOf(Props.create( + DoNothingActor.class)); + + @Test + public void testWhenACandidateIsCreatedItIncrementsTheCurrentTermAndVotesForItself(){ + RaftActorContext raftActorContext = createActorContext(); + long expectedTerm = raftActorContext.getTermInformation().getCurrentTerm().get(); + + new Candidate(raftActorContext, Collections.EMPTY_LIST); + + assertEquals(expectedTerm+1, raftActorContext.getTermInformation().getCurrentTerm().get()); + assertEquals(raftActorContext.getId(), raftActorContext.getTermInformation().getVotedFor()); + } + + @Test + public void testThatAnElectionTimeoutIsTriggered(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + Candidate candidate = new Candidate(createActorContext(getTestActor()), Collections.EMPTY_LIST); + + final Boolean out = new ExpectMsg(duration("1 seconds"), "ElectionTimeout") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof ElectionTimeout) { + return true; + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(true, out); + } + }; + }}; + } + + @Test + public void testHandleElectionTimeoutWhenThereAreZeroPeers(){ + RaftActorContext raftActorContext = createActorContext(); + Candidate candidate = + new Candidate(raftActorContext, Collections.EMPTY_LIST); + + RaftState raftState = + candidate.handleMessage(candidateActor, new ElectionTimeout()); + + Assert.assertEquals(RaftState.Leader, raftState); + } + + @Test + public void testHandleElectionTimeoutWhenThereAreTwoPeers(){ + RaftActorContext raftActorContext = createActorContext(); + Candidate candidate = + new Candidate(raftActorContext, Arrays + .asList(peerActor1.path().toString(), + peerActor2.path().toString())); + + RaftState raftState = + candidate.handleMessage(candidateActor, new ElectionTimeout()); + + Assert.assertEquals(RaftState.Candidate, raftState); + } + + @Test + public void testBecomeLeaderOnReceivingMajorityVotesInThreePeerCluster(){ + RaftActorContext raftActorContext = createActorContext(); + Candidate candidate = + new Candidate(raftActorContext, Arrays + .asList(peerActor1.path().toString(), + peerActor2.path().toString())); + + RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true)); + + Assert.assertEquals(RaftState.Leader, stateOnFirstVote); + + } + + @Test + public void testBecomeLeaderOnReceivingMajorityVotesInFivePeerCluster(){ + RaftActorContext raftActorContext = createActorContext(); + Candidate candidate = + new Candidate(raftActorContext, Arrays + .asList(peerActor1.path().toString(), + peerActor2.path().toString(), + peerActor3.path().toString())); + + RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true)); + + RaftState stateOnSecondVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true)); + + Assert.assertEquals(RaftState.Candidate, stateOnFirstVote); + Assert.assertEquals(RaftState.Leader, stateOnSecondVote); + + } + + @Test + public void testResponseToAppendEntriesWithLowerTerm(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + Candidate candidate = new Candidate(createActorContext(getTestActor()), Collections.EMPTY_LIST); + + candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.EMPTY_LIST, 0)); + + final Boolean out = new ExpectMsg(duration("1 seconds"), "AppendEntriesResponse") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof AppendEntriesReply) { + AppendEntriesReply reply = (AppendEntriesReply) in; + return reply.isSuccess(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(false, out); + } + }; + }}; + } + + @Test + public void testResponseToRequestVoteWithLowerTerm(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + Candidate candidate = new Candidate(createActorContext(getTestActor()), Collections.EMPTY_LIST); + + candidate.handleMessage(getTestActor(), new RequestVote(0, "test", 0, 0)); + + final Boolean out = new ExpectMsg(duration("1 seconds"), "AppendEntriesResponse") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof RequestVoteReply) { + RequestVoteReply reply = (RequestVoteReply) in; + return reply.isVoteGranted(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(false, out); + } + }; + }}; + } + + + @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { return new Candidate(actorContext, Collections.EMPTY_LIST); } + + @Override protected RaftActorContext createActorContext() { + return new MockRaftActorContext("test", getSystem(), candidateActor); + } + + protected RaftActorContext createActorContext(ActorRef candidateActor) { + return new MockRaftActorContext("test", getSystem(), candidateActor); + } + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index cd3bd2833d..5f4f3632ca 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -1,9 +1,23 @@ package org.opendaylight.controller.cluster.raft.behaviors; +import akka.actor.ActorRef; +import akka.actor.Props; +import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; public class FollowerTest extends AbstractRaftActorBehaviorTest { + + private final ActorRef followerActor = getSystem().actorOf(Props.create( + DoNothingActor.class)); + + @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { return new Follower(actorContext); } + + @Override protected RaftActorContext createActorContext() { + return new MockRaftActorContext("test", getSystem(), followerActor); + } + } -- 2.36.6