Complete Candidate behavior implementation 61/8961/3
authorMoiz Raja <moraja@cisco.com>
Sat, 12 Jul 2014 00:12:59 +0000 (17:12 -0700)
committerMoiz Raja <moraja@cisco.com>
Sat, 26 Jul 2014 22:03:57 +0000 (15:03 -0700)
Change-Id: I483ec7d0a3430e186decda7ac45258b47132694b
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ElectionTimeout.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/Replicate.java with 74% similarity]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/EntryReplicated.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ReplicateEntry.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java

index 7746911..f61905e 100644 (file)
@@ -9,14 +9,23 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import scala.concurrent.duration.FiniteDuration;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * The behavior of a RaftActor when it is in the CandidateState
@@ -37,31 +46,107 @@ import java.util.List;
  * </ul>
  */
 public class Candidate extends AbstractRaftActorBehavior {
-    private final List<String> peers;
 
-    public Candidate(RaftActorContext context, List<String> peers) {
+    /**
+     * The maximum election time variance
+     */
+    private static final int ELECTION_TIME_MAX_VARIANCE = 100;
+
+    /**
+     * The interval in which a new election would get triggered if no leader is found
+     */
+    private static final long ELECTION_TIME_INTERVAL = Leader.HEART_BEAT_INTERVAL.toMillis() * 2;
+
+    /**
+     *
+     */
+    private final Map<String, ActorSelection> peerToActor = new HashMap<>();
+
+    private Cancellable electionCancel = null;
+
+    private int voteCount;
+
+    private final int votesRequired;
+
+    public Candidate(RaftActorContext context, List<String> peerPaths) {
         super(context);
-        this.peers = peers;
+
+        for (String peerPath : peerPaths) {
+            peerToActor.put(peerPath,
+                context.actorSelection(peerPath));
+        }
+
+        if(peerPaths.size() > 0) {
+            // Votes are required from a majority of the peers including self.
+            // The votesRequired field therefore stores a calculated value
+            // of the number of votes required for this candidate to win an
+            // election based on it's known peers.
+            // If a peer was added during normal operation and raft replicas
+            // came to know about them then the new peer would also need to be
+            // taken into consideration when calculating this value.
+            // Here are some examples for what the votesRequired would be for n
+            // peers
+            // 0 peers = 1 votesRequired (0 + 1) / 2 + 1 = 1
+            // 2 peers = 2 votesRequired (2 + 1) / 2 + 1 = 2
+            // 4 peers = 3 votesRequired (4 + 1) / 2 + 1 = 3
+            int noOfPeers = peerPaths.size();
+            int self = 1;
+            votesRequired = (noOfPeers + self) / 2 + 1;
+        } else {
+            votesRequired = 0;
+        }
+
+        scheduleElection(randomizedDuration());
     }
 
     @Override protected RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries, RaftState suggestedState) {
+
+        // There is some peer who thinks it's a leader but is not
+        // I will not accept this append entries
+        sender.tell(new AppendEntriesReply(
+            context.getTermInformation().getCurrentTerm().get(), false),
+            context.getActor());
+
         return suggestedState;
     }
 
     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
+
+        // Some peer thinks I was a leader and sent me a reply
+
         return suggestedState;
     }
 
     @Override protected RaftState handleRequestVote(ActorRef sender,
         RequestVote requestVote, RaftState suggestedState) {
+
+        // We got this RequestVote because the term in there is less than
+        // or equal to our current term, so do not grant the vote
+        sender.tell(new RequestVoteReply(
+            context.getTermInformation().getCurrentTerm().get(), false),
+            context.getActor());
+
         return suggestedState;
     }
 
     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply, RaftState suggestedState) {
-        return suggestedState;
+        if(suggestedState == RaftState.Follower) {
+            // If base class thinks I should be follower then I am
+            return suggestedState;
+        }
+
+        if(requestVoteReply.isVoteGranted()){
+            voteCount++;
+        }
+
+        if(voteCount >= votesRequired){
+            return RaftState.Leader;
+        }
+
+        return state();
     }
 
     @Override protected RaftState state() {
@@ -70,6 +155,54 @@ public class Candidate extends AbstractRaftActorBehavior {
 
     @Override
     public RaftState handleMessage(ActorRef sender, Object message) {
+        if(message instanceof ElectionTimeout){
+            if(votesRequired == 0){
+                // If there are no peers then we should be a Leader
+                // We wait for the election timeout to occur before declare
+                // ourselves the leader. This gives enough time for a leader
+                // who we do not know about (as a peer)
+                // to send a message to the candidate
+                return RaftState.Leader;
+            }
+            scheduleElection(randomizedDuration());
+            return state();
+        }
         return super.handleMessage(sender, message);
     }
+
+    private FiniteDuration randomizedDuration(){
+        long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
+        return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS);
+    }
+
+    private void scheduleElection(FiniteDuration interval) {
+
+        // set voteCount back to 1 (that is voting for self)
+        voteCount = 1;
+
+        // Increment the election term and vote for self
+        AtomicLong currentTerm = context.getTermInformation().getCurrentTerm();
+        context.getTermInformation().update(currentTerm.incrementAndGet(), context.getId());
+
+        // Request for a vote
+        for(ActorSelection peerActor : peerToActor.values()){
+            peerActor.tell(new RequestVote(
+                context.getTermInformation().getCurrentTerm().get(),
+                context.getId(), context.getReplicatedLog().last().getIndex(),
+                context.getReplicatedLog().last().getTerm()),
+                context.getActor());
+        }
+
+        if (electionCancel != null && !electionCancel.isCancelled()) {
+            electionCancel.cancel();
+        }
+
+        // Schedule an election. When the scheduler triggers an ElectionTimeout
+        // message is sent to itself
+        electionCancel =
+            context.getActorSystem().scheduler().scheduleOnce(interval,
+                context.getActor(), new ElectionTimeout(),
+                context.getActorSystem().dispatcher(), context.getActor());
+    }
+
 }
index 6c3eee5..5c37455 100644 (file)
@@ -61,11 +61,9 @@ public class Leader extends AbstractRaftActorBehavior {
      * Since this is set to 100 milliseconds the Election timeout should be
      * at least 200 milliseconds
      */
-    private static final FiniteDuration HEART_BEAT_INTERVAL =
+    public static final FiniteDuration HEART_BEAT_INTERVAL =
         new FiniteDuration(100, TimeUnit.MILLISECONDS);
 
-    private final Map<String, ActorRef> followerToReplicator = new HashMap<>();
-
     private final Map<String, FollowerLogInformation> followerToLog =
         new HashMap();
 
@@ -73,19 +71,18 @@ public class Leader extends AbstractRaftActorBehavior {
 
     private Cancellable heartbeatCancel = null;
 
-    public Leader(RaftActorContext context, List<String> followers) {
+    public Leader(RaftActorContext context, List<String> followePaths) {
         super(context);
 
-        for (String follower : followers) {
-
+        for (String followerPath : followePaths) {
             FollowerLogInformation followerLogInformation =
-                new FollowerLogInformationImpl(follower,
+                new FollowerLogInformationImpl(followerPath,
                     new AtomicLong(0),
                     new AtomicLong(0));
 
-            followerToActor.put(follower,
+            followerToActor.put(followerPath,
                 context.actorSelection(followerLogInformation.getId()));
-            followerToLog.put(follower, followerLogInformation);
+            followerToLog.put(followerPath, followerLogInformation);
 
         }
 
@@ -147,8 +144,8 @@ public class Leader extends AbstractRaftActorBehavior {
             heartbeatCancel.cancel();
         }
 
-        // Schedule a heartbeat. When the scheduler triggers the replicator
-        // will let the RaftActor (leader) know that a new heartbeat needs to be sent
+        // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
+        // message is sent to itself.
         // Scheduling the heartbeat only once here because heartbeats do not
         // need to be sent if there are other messages being sent to the remote
         // actor.
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/EntryReplicated.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/EntryReplicated.java
deleted file mode 100644 (file)
index da28024..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.internal.messages;
-
-/**
- * Sent by a RaftReplicator to the RaftActor when it has successfully
- * replicated an entry to a remote RaftActor
- */
-public class EntryReplicated {
-}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ReplicateEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ReplicateEntry.java
deleted file mode 100644 (file)
index 8fe235c..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.internal.messages;
-
-/**
- * Sent to a replicator when log entries need to be replicated to other
- * members in the cluster
- */
-public class ReplicateEntry {
-}
index d39eeec..c265388 100644 (file)
@@ -22,15 +22,18 @@ public class MockRaftActorContext implements RaftActorContext {
     private ActorRef actor;
     private AtomicLong index = new AtomicLong(0);
     private AtomicLong lastApplied = new AtomicLong(0);
+    private final ElectionTerm electionTerm;
 
     public MockRaftActorContext(){
-
+        electionTerm = null;
     }
 
     public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
         this.id = id;
         this.system = system;
         this.actor = actor;
+
+        electionTerm = new ElectionTermImpl(id);
     }
 
     @Override public ActorRef actorOf(Props props) {
@@ -50,7 +53,7 @@ public class MockRaftActorContext implements RaftActorContext {
     }
 
     @Override public ElectionTerm getTermInformation() {
-        return new ElectionTermImpl(this.id);
+        return electionTerm;
     }
 
     public void setIndex(AtomicLong index){
index 6a3040d..9efdbd7 100644 (file)
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
+import java.util.Arrays;
 import java.util.Collections;
 
+import static org.junit.Assert.assertEquals;
+
 public class CandidateTest extends AbstractRaftActorBehaviorTest {
 
+    private final ActorRef candidateActor = getSystem().actorOf(Props.create(
+        DoNothingActor.class));
+
+    private final ActorRef peerActor1 = getSystem().actorOf(Props.create(
+        DoNothingActor.class));
+
+    private final ActorRef peerActor2 = getSystem().actorOf(Props.create(
+        DoNothingActor.class));
+
+    private final ActorRef peerActor3 = getSystem().actorOf(Props.create(
+        DoNothingActor.class));
+
+    private final ActorRef peerActor4 = getSystem().actorOf(Props.create(
+        DoNothingActor.class));
+
+    @Test
+    public void testWhenACandidateIsCreatedItIncrementsTheCurrentTermAndVotesForItself(){
+        RaftActorContext raftActorContext = createActorContext();
+        long expectedTerm = raftActorContext.getTermInformation().getCurrentTerm().get();
+
+        new Candidate(raftActorContext, Collections.EMPTY_LIST);
+
+        assertEquals(expectedTerm+1, raftActorContext.getTermInformation().getCurrentTerm().get());
+        assertEquals(raftActorContext.getId(), raftActorContext.getTermInformation().getVotedFor());
+    }
+
+    @Test
+    public void testThatAnElectionTimeoutIsTriggered(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    Candidate candidate = new Candidate(createActorContext(getTestActor()), Collections.EMPTY_LIST);
+
+                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "ElectionTimeout") {
+                        // do not put code outside this method, will run afterwards
+                        protected Boolean match(Object in) {
+                            if (in instanceof ElectionTimeout) {
+                                 return true;
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get();
+
+                    assertEquals(true, out);
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testHandleElectionTimeoutWhenThereAreZeroPeers(){
+        RaftActorContext raftActorContext = createActorContext();
+        Candidate candidate =
+            new Candidate(raftActorContext, Collections.EMPTY_LIST);
+
+        RaftState raftState =
+            candidate.handleMessage(candidateActor, new ElectionTimeout());
+
+        Assert.assertEquals(RaftState.Leader, raftState);
+    }
+
+    @Test
+    public void testHandleElectionTimeoutWhenThereAreTwoPeers(){
+        RaftActorContext raftActorContext = createActorContext();
+        Candidate candidate =
+            new Candidate(raftActorContext, Arrays
+                .asList(peerActor1.path().toString(),
+                    peerActor2.path().toString()));
+
+        RaftState raftState =
+            candidate.handleMessage(candidateActor, new ElectionTimeout());
+
+        Assert.assertEquals(RaftState.Candidate, raftState);
+    }
+
+    @Test
+    public void testBecomeLeaderOnReceivingMajorityVotesInThreePeerCluster(){
+        RaftActorContext raftActorContext = createActorContext();
+        Candidate candidate =
+            new Candidate(raftActorContext, Arrays
+                .asList(peerActor1.path().toString(),
+                    peerActor2.path().toString()));
+
+        RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
+
+        Assert.assertEquals(RaftState.Leader, stateOnFirstVote);
+
+    }
+
+    @Test
+    public void testBecomeLeaderOnReceivingMajorityVotesInFivePeerCluster(){
+        RaftActorContext raftActorContext = createActorContext();
+        Candidate candidate =
+            new Candidate(raftActorContext, Arrays
+                .asList(peerActor1.path().toString(),
+                    peerActor2.path().toString(),
+                    peerActor3.path().toString()));
+
+        RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
+
+        RaftState stateOnSecondVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
+
+        Assert.assertEquals(RaftState.Candidate, stateOnFirstVote);
+        Assert.assertEquals(RaftState.Leader, stateOnSecondVote);
+
+    }
+
+    @Test
+    public void testResponseToAppendEntriesWithLowerTerm(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    Candidate candidate = new Candidate(createActorContext(getTestActor()), Collections.EMPTY_LIST);
+
+                    candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.EMPTY_LIST, 0));
+
+                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "AppendEntriesResponse") {
+                        // do not put code outside this method, will run afterwards
+                        protected Boolean match(Object in) {
+                            if (in instanceof AppendEntriesReply) {
+                                AppendEntriesReply reply = (AppendEntriesReply) in;
+                                return reply.isSuccess();
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get();
+
+                    assertEquals(false, out);
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testResponseToRequestVoteWithLowerTerm(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    Candidate candidate = new Candidate(createActorContext(getTestActor()), Collections.EMPTY_LIST);
+
+                    candidate.handleMessage(getTestActor(), new RequestVote(0, "test", 0, 0));
+
+                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "AppendEntriesResponse") {
+                        // do not put code outside this method, will run afterwards
+                        protected Boolean match(Object in) {
+                            if (in instanceof RequestVoteReply) {
+                                RequestVoteReply reply = (RequestVoteReply) in;
+                                return reply.isVoteGranted();
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get();
+
+                    assertEquals(false, out);
+                }
+            };
+        }};
+    }
+
+
+
     @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
         return new Candidate(actorContext, Collections.EMPTY_LIST);
     }
+
+    @Override protected RaftActorContext createActorContext() {
+        return new MockRaftActorContext("test", getSystem(), candidateActor);
+    }
+
+    protected RaftActorContext createActorContext(ActorRef candidateActor) {
+        return new MockRaftActorContext("test", getSystem(), candidateActor);
+    }
+
 }
index cd3bd28..5f4f363 100644 (file)
@@ -1,9 +1,23 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
 public class FollowerTest extends AbstractRaftActorBehaviorTest {
+
+    private final ActorRef followerActor = getSystem().actorOf(Props.create(
+        DoNothingActor.class));
+
+
     @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
         return new Follower(actorContext);
     }
+
+    @Override protected RaftActorContext createActorContext() {
+        return new MockRaftActorContext("test", getSystem(), followerActor);
+    }
+
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.