Properly handle RequestVote in all states 72/8972/3
authorMoiz Raja <moraja@cisco.com>
Sat, 12 Jul 2014 12:39:52 +0000 (05:39 -0700)
committerMoiz Raja <moraja@cisco.com>
Sat, 26 Jul 2014 22:07:14 +0000 (15:07 -0700)
Change-Id: I31987824a2219be230e439ecb152e031c749d4ef
Signed-off-by: Moiz Raja <moraja@cisco.com>
13 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java

index 4b0367f0ad1e0e6059db47f372d8b9a79775606c..664ab5e7b29868014a8a913dc519313f90865de6 100644 (file)
@@ -8,8 +8,6 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * ElectionTerm contains information about a RaftActors election term.
  * <p>
@@ -23,7 +21,7 @@ public interface ElectionTerm {
      * latest term server has seen (initialized to 0
      * on first boot, increases monotonically)
      */
-    AtomicLong getCurrentTerm();
+    long getCurrentTerm();
 
     /**
      * candidateId that received vote in current
index e75e0c5bb61f2e35881242df7206c2bb0a4e1704..6a598be680b3607e96952a47edf7d03db7226fa0 100644 (file)
@@ -8,15 +8,13 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 public class ElectionTermImpl implements ElectionTerm{
     /**
      * Identifier of the actor whose election term information this is
      */
     private final String id;
 
-    private AtomicLong currentTerm;
+    private long currentTerm;
 
     private String votedFor;
 
@@ -24,13 +22,13 @@ public class ElectionTermImpl implements ElectionTerm{
         this.id = id;
 
         // TODO: Read currentTerm from some persistent state
-        currentTerm = new AtomicLong(0);
+        currentTerm = 0;
 
         // TODO: Read votedFor from some file
         votedFor = "";
     }
 
-    public AtomicLong getCurrentTerm() {
+    public long getCurrentTerm() {
         return currentTerm;
     }
 
@@ -39,7 +37,7 @@ public class ElectionTermImpl implements ElectionTerm{
     }
 
     public void update(long currentTerm, String votedFor){
-        this.currentTerm.set(currentTerm);
+        this.currentTerm = currentTerm;
         this.votedFor = votedFor;
 
         // TODO : Write to some persistent state
index 6e8e8208fb940d08940de1a8bd40f008c7258ece..5e628bdd1bb035342529b64702c707bf6a458696 100644 (file)
@@ -15,7 +15,6 @@ import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 
 import java.util.Collections;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * RaftActor encapsulates a state machine that needs to be kept synchronized
@@ -77,7 +76,7 @@ public abstract class RaftActor extends UntypedEventsourcedProcessor {
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(),
             id, new ElectionTermImpl(id),
-            new AtomicLong(0), new AtomicLong(0), new ReplicatedLogImpl());
+            0, 0, new ReplicatedLogImpl());
         currentBehavior = switchBehavior(RaftState.Follower);
     }
 
index 554461a76c7480d27d5ef5daaf9cd6f814d5e2ff..4bc9162fb5d0811d8f7cba3642b0ca98fe1e1110 100644 (file)
@@ -13,8 +13,6 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * The RaftActorContext contains that portion of the RaftActors state that
  * needs to be shared with it's behaviors. A RaftActorContext should NEVER be
@@ -62,7 +60,13 @@ public interface RaftActorContext {
      *    monotonically)
      * @return
      */
-    AtomicLong getCommitIndex();
+    long getCommitIndex();
+
+
+    /**
+     *
+     */
+    void setCommitIndex(long commitIndex);
 
     /**
      * index of highest log entry applied to state
@@ -70,7 +74,13 @@ public interface RaftActorContext {
      *    monotonically)
      * @return
      */
-    AtomicLong getLastApplied();
+    long getLastApplied();
+
+
+    /**
+     *
+     */
+    void setLastApplied(long lastApplied);
 
     /**
      * @return A representation of the log
index 1fdc3c628fcb659fd26074d307709c0b1726801b..845011a7e372548999c9b643d26630528543e473 100644 (file)
@@ -14,8 +14,6 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActorContext;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 public class RaftActorContextImpl implements RaftActorContext{
 
     private final ActorRef actor;
@@ -26,16 +24,16 @@ public class RaftActorContextImpl implements RaftActorContext{
 
     private final ElectionTerm termInformation;
 
-    private final AtomicLong commitIndex;
+    private long commitIndex;
 
-    private final AtomicLong lastApplied;
+    private long lastApplied;
 
     private final ReplicatedLog replicatedLog;
 
     public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
         String id,
-        ElectionTerm termInformation, AtomicLong commitIndex,
-        AtomicLong lastApplied, ReplicatedLog replicatedLog) {
+        ElectionTerm termInformation, long commitIndex,
+        long lastApplied, ReplicatedLog replicatedLog) {
         this.actor = actor;
         this.context = context;
         this.id = id;
@@ -65,14 +63,22 @@ public class RaftActorContextImpl implements RaftActorContext{
         return termInformation;
     }
 
-    public AtomicLong getCommitIndex() {
+    public long getCommitIndex() {
         return commitIndex;
     }
 
-    public AtomicLong getLastApplied() {
+    @Override public void setCommitIndex(long commitIndex) {
+        this.commitIndex = commitIndex;
+    }
+
+    public long getLastApplied() {
         return lastApplied;
     }
 
+    @Override public void setLastApplied(long lastApplied) {
+        this.lastApplied = lastApplied;
+    }
+
     @Override public ReplicatedLog getReplicatedLog() {
         return replicatedLog;
     }
index d7a8d5abb35bdb258633b65a783aed9e0aa9677d..ae1baec471ef1d08abdcbb6d9c56052b95a87ffa 100644 (file)
@@ -9,13 +9,19 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.Cancellable;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Abstract class that represents the behavior of a RaftActor
@@ -34,6 +40,22 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      */
     protected final RaftActorContext context;
 
+    /**
+     * The maximum election time variance
+     */
+    private static final int ELECTION_TIME_MAX_VARIANCE = 100;
+
+    /**
+     * The interval in which a new election would get triggered if no leader is found
+     */
+    private static final long ELECTION_TIME_INTERVAL = Leader.HEART_BEAT_INTERVAL.toMillis() * 2;
+
+    /**
+     *
+     */
+
+    private Cancellable electionCancel = null;
+
 
     protected AbstractRaftActorBehavior(RaftActorContext context) {
         this.context = context;
@@ -76,23 +98,45 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply, RaftState suggestedState);
 
-    /**
-     * Derived classes should not directly handle RequestVote messages it
-     * should let the base class handle it first. Once the base class handles
-     * the RequestVote message and does the common actions that are applicable
-     * in all RaftState's it will delegate the handling of the RequestVote
-     * message to the derived class to do more state specific handling by calling
-     * this method
-     *
-     * @param sender         The actor that sent this message
-     * @param requestVote    The RequestVote message
-     * @param suggestedState The state that the RaftActor should be in based
-     *                       on the base class's processing of the RequestVote
-     *                       message
-     * @return
-     */
-    protected abstract RaftState handleRequestVote(ActorRef sender,
-        RequestVote requestVote, RaftState suggestedState);
+    protected RaftState handleRequestVote(ActorRef sender,
+        RequestVote requestVote, RaftState suggestedState){
+
+        boolean grantVote = false;
+
+        //  Reply false if term < currentTerm (§5.1)
+        if(requestVote.getTerm() < currentTerm()){
+            grantVote = false;
+
+        // If votedFor is null or candidateId, and candidate’s log is at
+        // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
+        } else if (votedFor() == null || votedFor().equals(requestVote.getCandidateId())) {
+
+            boolean candidateLatest = false;
+
+            // From §5.4.1
+            // Raft determines which of two logs is more up-to-date
+            // by comparing the index and term of the last entries in the
+            // logs. If the logs have last entries with different terms, then
+            // the log with the later term is more up-to-date. If the logs
+            // end with the same term, then whichever log is longer is
+            // more up-to-date.
+            if(requestVote.getLastLogTerm() > lastTerm()){
+                candidateLatest = true;
+            } else if((requestVote.getLastLogTerm() == lastTerm()) && requestVote.getLastLogIndex() >= lastTerm()){
+                candidateLatest = true;
+            }
+
+            if(candidateLatest) {
+                grantVote = true;
+                context.getTermInformation().update(requestVote.getTerm(),
+                    requestVote.getCandidateId());
+            }
+        }
+
+        sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor());
+
+        return suggestedState;
+    }
 
     /**
      * Derived classes should not directly handle RequestVoteReply messages it
@@ -119,6 +163,46 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      */
     protected abstract RaftState state();
 
+    protected FiniteDuration electionDuration(){
+        long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
+        return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS);
+    }
+
+    protected void scheduleElection(FiniteDuration interval) {
+
+        if (electionCancel != null && !electionCancel.isCancelled()) {
+            electionCancel.cancel();
+        }
+
+        // Schedule an election. When the scheduler triggers an ElectionTimeout
+        // message is sent to itself
+        electionCancel =
+            context.getActorSystem().scheduler().scheduleOnce(interval,
+                context.getActor(), new ElectionTimeout(),
+                context.getActorSystem().dispatcher(), context.getActor());
+    }
+
+    protected long currentTerm(){
+        return context.getTermInformation().getCurrentTerm();
+    }
+
+    protected String votedFor(){
+        return context.getTermInformation().getVotedFor();
+    }
+
+    protected ActorRef actor(){
+        return context.getActor();
+    }
+
+    protected long lastTerm() {
+        return context.getReplicatedLog().last().getTerm();
+    }
+
+    protected long lastIndex() {
+        return context.getReplicatedLog().last().getIndex();
+    }
+
+
     @Override
     public RaftState handleMessage(ActorRef sender, Object message) {
         RaftState raftState = state();
@@ -127,8 +211,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         }
         if (message instanceof AppendEntries) {
             AppendEntries appendEntries = (AppendEntries) message;
-            if (appendEntries.getLeaderCommit() > context.getLastApplied()
-                .get()) {
+            if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
                 applyLogToStateMachine(appendEntries.getLeaderCommit());
             }
             raftState = handleAppendEntries(sender, appendEntries, raftState);
@@ -148,8 +231,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     private RaftState applyTerm(RaftRPC rpc) {
-        if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()
-            .get()) {
+        // If RPC request or response contains term T > currentTerm:
+        // set currentTerm = T, convert to follower (§5.1)
+        // This applies to all RPC messages and responses
+        if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
             context.getTermInformation().update(rpc.getTerm(), null);
             return RaftState.Follower;
         }
@@ -157,6 +242,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     private void applyLogToStateMachine(long index) {
-        context.getLastApplied().set(index);
+        // Send a local message to the local RaftActor (it's derived class to be
+        // specific to apply the log to it's index)
+        context.setLastApplied(index);
     }
+
+
 }
index f61905e393ff9e391c31ecccae9ac0bdfd1612ea..3e6b50263122a83bc8252807d65bd25700b1e5f7 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
@@ -18,14 +17,10 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * The behavior of a RaftActor when it is in the CandidateState
@@ -47,23 +42,8 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class Candidate extends AbstractRaftActorBehavior {
 
-    /**
-     * The maximum election time variance
-     */
-    private static final int ELECTION_TIME_MAX_VARIANCE = 100;
-
-    /**
-     * The interval in which a new election would get triggered if no leader is found
-     */
-    private static final long ELECTION_TIME_INTERVAL = Leader.HEART_BEAT_INTERVAL.toMillis() * 2;
-
-    /**
-     *
-     */
     private final Map<String, ActorSelection> peerToActor = new HashMap<>();
 
-    private Cancellable electionCancel = null;
-
     private int voteCount;
 
     private final int votesRequired;
@@ -96,7 +76,8 @@ public class Candidate extends AbstractRaftActorBehavior {
             votesRequired = 0;
         }
 
-        scheduleElection(randomizedDuration());
+        startNewTerm();
+        scheduleElection(electionDuration());
     }
 
     @Override protected RaftState handleAppendEntries(ActorRef sender,
@@ -105,7 +86,7 @@ public class Candidate extends AbstractRaftActorBehavior {
         // There is some peer who thinks it's a leader but is not
         // I will not accept this append entries
         sender.tell(new AppendEntriesReply(
-            context.getTermInformation().getCurrentTerm().get(), false),
+            context.getTermInformation().getCurrentTerm(), false),
             context.getActor());
 
         return suggestedState;
@@ -119,18 +100,6 @@ public class Candidate extends AbstractRaftActorBehavior {
         return suggestedState;
     }
 
-    @Override protected RaftState handleRequestVote(ActorRef sender,
-        RequestVote requestVote, RaftState suggestedState) {
-
-        // We got this RequestVote because the term in there is less than
-        // or equal to our current term, so do not grant the vote
-        sender.tell(new RequestVoteReply(
-            context.getTermInformation().getCurrentTerm().get(), false),
-            context.getActor());
-
-        return suggestedState;
-    }
-
     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply, RaftState suggestedState) {
         if(suggestedState == RaftState.Follower) {
@@ -164,45 +133,32 @@ public class Candidate extends AbstractRaftActorBehavior {
                 // to send a message to the candidate
                 return RaftState.Leader;
             }
-            scheduleElection(randomizedDuration());
+            startNewTerm();
+            scheduleElection(electionDuration());
             return state();
         }
         return super.handleMessage(sender, message);
     }
 
-    private FiniteDuration randomizedDuration(){
-        long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
-        return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS);
-    }
-
-    private void scheduleElection(FiniteDuration interval) {
 
+    private void startNewTerm(){
         // set voteCount back to 1 (that is voting for self)
         voteCount = 1;
 
         // Increment the election term and vote for self
-        AtomicLong currentTerm = context.getTermInformation().getCurrentTerm();
-        context.getTermInformation().update(currentTerm.incrementAndGet(), context.getId());
+        long currentTerm = context.getTermInformation().getCurrentTerm();
+        context.getTermInformation().update(currentTerm+1, context.getId());
 
         // Request for a vote
         for(ActorSelection peerActor : peerToActor.values()){
             peerActor.tell(new RequestVote(
-                context.getTermInformation().getCurrentTerm().get(),
-                context.getId(), context.getReplicatedLog().last().getIndex(),
-                context.getReplicatedLog().last().getTerm()),
+                    context.getTermInformation().getCurrentTerm(),
+                    context.getId(), context.getReplicatedLog().last().getIndex(),
+                    context.getReplicatedLog().last().getTerm()),
                 context.getActor());
         }
 
-        if (electionCancel != null && !electionCancel.isCancelled()) {
-            electionCancel.cancel();
-        }
 
-        // Schedule an election. When the scheduler triggers an ElectionTimeout
-        // message is sent to itself
-        electionCancel =
-            context.getActorSystem().scheduler().scheduleOnce(interval,
-                context.getActor(), new ElectionTimeout(),
-                context.getActorSystem().dispatcher(), context.getActor());
     }
 
 }
index 1bdcc8bdb4296c2ceea270ffaa4b261740e1ed00..d93271072ce60f86747a5ef6ff19f825ce1473c5 100644 (file)
@@ -11,9 +11,9 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import akka.actor.ActorRef;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
-import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
 /**
@@ -30,6 +30,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 public class Follower extends AbstractRaftActorBehavior {
     public Follower(RaftActorContext context) {
         super(context);
+
+        scheduleElection(electionDuration());
     }
 
     @Override protected RaftState handleAppendEntries(ActorRef sender,
@@ -42,11 +44,6 @@ public class Follower extends AbstractRaftActorBehavior {
         return suggestedState;
     }
 
-    @Override protected RaftState handleRequestVote(ActorRef sender,
-        RequestVote requestVote, RaftState suggestedState) {
-        return suggestedState;
-    }
-
     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply, RaftState suggestedState) {
         return suggestedState;
@@ -57,6 +54,12 @@ public class Follower extends AbstractRaftActorBehavior {
     }
 
     @Override public RaftState handleMessage(ActorRef sender, Object message) {
+        if(message instanceof ElectionTimeout){
+            return RaftState.Candidate;
+        }
+
+        scheduleElection(electionDuration());
+
         return super.handleMessage(sender, message);
     }
 }
index 5c37455be9420db37a4e7ce1dbdc82a479600fe7..0498d7fc8b17c52f60cd2b5b0cc249853a5daca7 100644 (file)
@@ -19,7 +19,6 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
-import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -105,11 +104,6 @@ public class Leader extends AbstractRaftActorBehavior {
         return suggestedState;
     }
 
-    @Override protected RaftState handleRequestVote(ActorRef sender,
-        RequestVote requestVote, RaftState suggestedState) {
-        return suggestedState;
-    }
-
     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply, RaftState suggestedState) {
         return suggestedState;
@@ -127,11 +121,11 @@ public class Leader extends AbstractRaftActorBehavior {
         if (message instanceof SendHeartBeat) {
             for (ActorSelection follower : followerToActor.values()) {
                 follower.tell(new AppendEntries(
-                    context.getTermInformation().getCurrentTerm().get(),
+                    context.getTermInformation().getCurrentTerm(),
                     context.getId(),
                     context.getReplicatedLog().last().getIndex(),
                     context.getReplicatedLog().last().getTerm(),
-                    Collections.EMPTY_LIST, context.getCommitIndex().get()),
+                    Collections.EMPTY_LIST, context.getCommitIndex()),
                     context.getActor());
             }
             return state();
index c265388f6bb8d5d38f375a6b50930e9299a57387..2a1cf9ab877f3a3ae6cdf0148997732b38516f00 100644 (file)
@@ -13,19 +13,20 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 public class MockRaftActorContext implements RaftActorContext {
 
     private String id;
     private ActorSystem system;
     private ActorRef actor;
-    private AtomicLong index = new AtomicLong(0);
-    private AtomicLong lastApplied = new AtomicLong(0);
+    private long index = 0;
+    private long lastApplied = 0;
     private final ElectionTerm electionTerm;
+    private ReplicatedLog replicatedLog;
 
     public MockRaftActorContext(){
         electionTerm = null;
+
+        initReplicatedLog();
     }
 
     public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
@@ -34,6 +35,16 @@ public class MockRaftActorContext implements RaftActorContext {
         this.actor = actor;
 
         electionTerm = new ElectionTermImpl(id);
+
+        initReplicatedLog();
+    }
+
+
+    public void initReplicatedLog(){
+        MockReplicatedLog mockReplicatedLog = new MockReplicatedLog();
+        this.replicatedLog = mockReplicatedLog;
+        mockReplicatedLog.setLast(new MockReplicatedLogEntry(1,1,""));
+        mockReplicatedLog.setReplicatedLogEntry(new MockReplicatedLogEntry(1,1, ""));
     }
 
     @Override public ActorRef actorOf(Props props) {
@@ -56,50 +67,84 @@ public class MockRaftActorContext implements RaftActorContext {
         return electionTerm;
     }
 
-    public void setIndex(AtomicLong index){
+    public void setIndex(long index){
         this.index = index;
     }
 
-    @Override public AtomicLong getCommitIndex() {
+    @Override public long getCommitIndex() {
         return index;
     }
 
-    public void setLastApplied(AtomicLong lastApplied){
+    @Override public void setCommitIndex(long commitIndex) {
+        this.index = commitIndex;
+    }
+
+    @Override public void setLastApplied(long lastApplied){
         this.lastApplied = lastApplied;
     }
 
-    @Override public AtomicLong getLastApplied() {
+    @Override public long getLastApplied() {
         return lastApplied;
     }
 
+    public void setReplicatedLog(ReplicatedLog replicatedLog) {
+        this.replicatedLog = replicatedLog;
+    }
+
     @Override public ReplicatedLog getReplicatedLog() {
-        return new ReplicatedLog(){
+        return replicatedLog;
+    }
 
-            @Override public ReplicatedLogEntry getReplicatedLogEntry(
-                long index) {
-                throw new UnsupportedOperationException(
-                    "getReplicatedLogEntry");
-            }
+    @Override public ActorSystem getActorSystem() {
+        return this.system;
+    }
+
+
+    public static class MockReplicatedLog implements ReplicatedLog {
+        private ReplicatedLogEntry replicatedLogEntry = new MockReplicatedLogEntry(0,0, "");
+        private ReplicatedLogEntry last = new MockReplicatedLogEntry(0,0, "");
 
-            @Override public ReplicatedLogEntry last() {
-                return new ReplicatedLogEntry() {
-                    @Override public Object getData() {
-                        return null;
-                    }
+        @Override public ReplicatedLogEntry getReplicatedLogEntry(long index) {
+            return replicatedLogEntry;
+        }
 
-                    @Override public long getTerm() {
-                        return 1;
-                    }
+        @Override public ReplicatedLogEntry last() {
+            return last;
+        }
 
-                    @Override public long getIndex() {
-                        return 1;
-                    }
-                };
-            }
-        };
+        public void setReplicatedLogEntry(
+            ReplicatedLogEntry replicatedLogEntry) {
+            this.replicatedLogEntry = replicatedLogEntry;
+        }
+
+        public void setLast(ReplicatedLogEntry last) {
+            this.last = last;
+        }
     }
 
-    @Override public ActorSystem getActorSystem() {
-        return this.system;
+    public static class MockReplicatedLogEntry implements ReplicatedLogEntry {
+
+        private final long term;
+        private final long index;
+        private final Object data;
+
+        public MockReplicatedLogEntry(long term, long index, Object data){
+
+            this.term = term;
+            this.index = index;
+            this.data = data;
+        }
+
+        @Override public Object getData() {
+            return data;
+        }
+
+        @Override public long getTerm() {
+            return term;
+        }
+
+        @Override public long getIndex() {
+            return index;
+        }
     }
 }
index e6bf26cdcdd9d8beefbcd1557c096ec24f312736..1e7aa6a1d5fe40e2fd57e3088e80dc0dcbe1d653 100644 (file)
@@ -1,6 +1,7 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.AbstractActorTest;
@@ -12,12 +13,17 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.assertEquals;
 
 public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{
+
+    private final ActorRef behaviorActor = getSystem().actorOf(Props.create(
+        DoNothingActor.class));
+
    @Test
     public void testHandlingOfRaftRPCWithNewerTerm() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -45,7 +51,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{
             RaftActorContext context =
                 createActorContext();
 
-            ((MockRaftActorContext) context).setLastApplied(new AtomicLong(100));
+            ((MockRaftActorContext) context).setLastApplied(100);
 
             AppendEntries appendEntries =
                 new AppendEntries(100, "leader-1", 0, 0, null, 101);
@@ -53,8 +59,114 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{
             RaftState raftState =
                 createBehavior(context).handleMessage(getRef(), appendEntries);
 
-            assertEquals(new AtomicLong(101).get(), context.getLastApplied().get());
+            assertEquals(new AtomicLong(101).get(), context.getLastApplied());
+
+        }};
+    }
+
+    @Test
+    public void testHandleRequestVoteWhenSenderTermGreaterThanCurrentTermAndSenderLogMoreUpToDate(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    RaftActorBehavior follower = createBehavior(
+                        createActorContext(behaviorActor));
+
+                    follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
+
+                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
+                        // do not put code outside this method, will run afterwards
+                        protected Boolean match(Object in) {
+                            if (in instanceof RequestVoteReply) {
+                                RequestVoteReply reply = (RequestVoteReply) in;
+                                return reply.isVoteGranted();
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get();
+
+                    assertEquals(true, out);
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testHandleRequestVoteWhenSenderTermGreaterThanCurrentTermButSenderLogLessUptoDate(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    RaftActorContext actorContext =
+                        createActorContext(behaviorActor);
+
+                    MockRaftActorContext.MockReplicatedLog log = new MockRaftActorContext.MockReplicatedLog();
+                    log.setReplicatedLogEntry(new MockRaftActorContext.MockReplicatedLogEntry(20000, 1000000, ""));
+                    log.setLast(
+                        new MockRaftActorContext.MockReplicatedLogEntry(20000,
+                            1000000, ""));
+
+                    ((MockRaftActorContext) actorContext).setReplicatedLog(log);
+
+                    RaftActorBehavior follower = createBehavior(actorContext);
+
+                    follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
 
+                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
+                        // do not put code outside this method, will run afterwards
+                        protected Boolean match(Object in) {
+                            if (in instanceof RequestVoteReply) {
+                                RequestVoteReply reply = (RequestVoteReply) in;
+                                return reply.isVoteGranted();
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get();
+
+                    assertEquals(false, out);
+                }
+            };
+        }};
+    }
+
+
+
+
+    @Test
+    public void testHandleRequestVoteWhenSenderTermLessThanCurrentTerm(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    RaftActorContext context = createActorContext(behaviorActor);
+
+                    context.getTermInformation().update(1000, null);
+
+                    RaftActorBehavior follower = createBehavior(context);
+
+                    follower.handleMessage(getTestActor(), new RequestVote(999, "test", 10000, 999));
+
+                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
+                        // do not put code outside this method, will run afterwards
+                        protected Boolean match(Object in) {
+                            if (in instanceof RequestVoteReply) {
+                                RequestVoteReply reply = (RequestVoteReply) in;
+                                return reply.isVoteGranted();
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get();
+
+                    assertEquals(false, out);
+                }
+            };
         }};
     }
 
@@ -76,6 +188,10 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{
         return new MockRaftActorContext();
     }
 
+    protected RaftActorContext createActorContext(ActorRef actor) {
+        return new MockRaftActorContext("test", getSystem(), actor);
+    }
+
     protected AppendEntries createAppendEntriesWithNewerTerm(){
         return new AppendEntries(100, "leader-1", 0, 0, null, 1);
     }
@@ -92,4 +208,6 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{
         return new RequestVoteReply(100, false);
     }
 
+
+
 }
index 9efdbd7c54a497d6b07ce11977f35b7814a58e99..183c668fca6b5c61366de788d8c16bd28a503754 100644 (file)
@@ -40,11 +40,11 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
     @Test
     public void testWhenACandidateIsCreatedItIncrementsTheCurrentTermAndVotesForItself(){
         RaftActorContext raftActorContext = createActorContext();
-        long expectedTerm = raftActorContext.getTermInformation().getCurrentTerm().get();
+        long expectedTerm = raftActorContext.getTermInformation().getCurrentTerm();
 
         new Candidate(raftActorContext, Collections.EMPTY_LIST);
 
-        assertEquals(expectedTerm+1, raftActorContext.getTermInformation().getCurrentTerm().get());
+        assertEquals(expectedTerm+1, raftActorContext.getTermInformation().getCurrentTerm());
         assertEquals(raftActorContext.getId(), raftActorContext.getTermInformation().getVotedFor());
     }
 
@@ -190,6 +190,74 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
+    @Test
+    public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    RaftActorContext context = createActorContext(getTestActor());
+
+                    context.getTermInformation().update(1000, null);
+
+                    // Once a candidate is created it will immediately increment the current term so after
+                    // construction the currentTerm should be 1001
+                    RaftActorBehavior follower = createBehavior(context);
+
+                    follower.handleMessage(getTestActor(), new RequestVote(1001, "test", 10000, 999));
+
+                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
+                        // do not put code outside this method, will run afterwards
+                        protected Boolean match(Object in) {
+                            if (in instanceof RequestVoteReply) {
+                                RequestVoteReply reply = (RequestVoteReply) in;
+                                return reply.isVoteGranted();
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get();
+
+                    assertEquals(true, out);
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    RaftActorContext context = createActorContext(getTestActor());
+
+                    context.getTermInformation().update(1000, "test");
+
+                    RaftActorBehavior follower = createBehavior(context);
+
+                    follower.handleMessage(getTestActor(), new RequestVote(1001, "candidate", 10000, 999));
+
+                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
+                        // do not put code outside this method, will run afterwards
+                        protected Boolean match(Object in) {
+                            if (in instanceof RequestVoteReply) {
+                                RequestVoteReply reply = (RequestVoteReply) in;
+                                return reply.isVoteGranted();
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get();
+
+                    assertEquals(false, out);
+                }
+            };
+        }};
+    }
+
 
 
     @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
@@ -200,8 +268,5 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         return new MockRaftActorContext("test", getSystem(), candidateActor);
     }
 
-    protected RaftActorContext createActorContext(ActorRef candidateActor) {
-        return new MockRaftActorContext("test", getSystem(), candidateActor);
-    }
 
 }
index 5f4f3632cacebef81537e8b22a7a5d9cd5aad664..90acbb1eae08db0eb08e485e7dcd5e1860a80998 100644 (file)
@@ -2,10 +2,19 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import junit.framework.Assert;
+import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
+import static org.junit.Assert.assertEquals;
+
 public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
     private final ActorRef followerActor = getSystem().actorOf(Props.create(
@@ -20,4 +29,108 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         return new MockRaftActorContext("test", getSystem(), followerActor);
     }
 
+    @Test
+    public void testThatAnElectionTimeoutIsTriggered(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    Follower follower = new Follower(createActorContext(getTestActor()));
+
+                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "ElectionTimeout") {
+                        // do not put code outside this method, will run afterwards
+                        protected Boolean match(Object in) {
+                            if (in instanceof ElectionTimeout) {
+                                return true;
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get();
+
+                    assertEquals(true, out);
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testHandleElectionTimeout(){
+        RaftActorContext raftActorContext = createActorContext();
+        Follower follower =
+            new Follower(raftActorContext);
+
+        RaftState raftState =
+            follower.handleMessage(followerActor, new ElectionTimeout());
+
+        Assert.assertEquals(RaftState.Candidate, raftState);
+    }
+
+    @Test
+    public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    RaftActorContext context = createActorContext(getTestActor());
+
+                    context.getTermInformation().update(1000, null);
+
+                    RaftActorBehavior follower = createBehavior(context);
+
+                    follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
+
+                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
+                        // do not put code outside this method, will run afterwards
+                        protected Boolean match(Object in) {
+                            if (in instanceof RequestVoteReply) {
+                                RequestVoteReply reply = (RequestVoteReply) in;
+                                return reply.isVoteGranted();
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get();
+
+                    assertEquals(true, out);
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    RaftActorContext context = createActorContext(getTestActor());
+
+                    context.getTermInformation().update(1000, "test");
+
+                    RaftActorBehavior follower = createBehavior(context);
+
+                    follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999));
+
+                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
+                        // do not put code outside this method, will run afterwards
+                        protected Boolean match(Object in) {
+                            if (in instanceof RequestVoteReply) {
+                                RequestVoteReply reply = (RequestVoteReply) in;
+                                return reply.isVoteGranted();
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get();
+
+                    assertEquals(false, out);
+                }
+            };
+        }};
+    }
+
 }