* @param currentTerm
* @param votedFor
*/
- void update(AtomicLong currentTerm, String votedFor);
+ void update(long currentTerm, String votedFor);
}
return votedFor;
}
- public void update(AtomicLong currentTerm, String votedFor){
- this.currentTerm = currentTerm;
+ public void update(long currentTerm, String votedFor){
+ this.currentTerm.set(currentTerm);
this.votedFor = votedFor;
// TODO : Write to some persistent state
package org.opendaylight.controller.cluster.raft.behaviors;
+import akka.actor.ActorRef;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
/**
* Abstract class that represents the behavior of a RaftActor
- * <p>
+ * <p/>
* All Servers:
* <ul>
* <li> If commitIndex > lastApplied: increment lastApplied, apply
protected AbstractRaftActorBehavior(RaftActorContext context) {
this.context = context;
}
+
+ /**
+ * Derived classes should not directly handle AppendEntries messages it
+ * should let the base class handle it first. Once the base class handles
+ * the AppendEntries message and does the common actions that are applicable
+ * in all RaftState's it will delegate the handling of the AppendEntries
+ * message to the derived class to do more state specific handling by calling
+ * this method
+ *
+ * @param sender The actor that sent this message
+ * @param appendEntries The AppendEntries message
+ * @param suggestedState The state that the RaftActor should be in based
+ * on the base class's processing of the AppendEntries
+ * message
+ * @return
+ */
+ protected abstract RaftState handleAppendEntries(ActorRef sender,
+ AppendEntries appendEntries, RaftState suggestedState);
+
+ /**
+ * Derived classes should not directly handle AppendEntriesReply messages it
+ * should let the base class handle it first. Once the base class handles
+ * the AppendEntriesReply message and does the common actions that are
+ * applicable in all RaftState's it will delegate the handling of the
+ * AppendEntriesReply message to the derived class to do more state specific
+ * handling by calling this method
+ *
+ * @param sender The actor that sent this message
+ * @param appendEntriesReply The AppendEntriesReply message
+ * @param suggestedState The state that the RaftActor should be in based
+ * on the base class's processing of the
+ * AppendEntriesReply message
+ * @return
+ */
+
+ protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
+ AppendEntriesReply appendEntriesReply, RaftState suggestedState);
+
+ /**
+ * Derived classes should not directly handle RequestVote messages it
+ * should let the base class handle it first. Once the base class handles
+ * the RequestVote message and does the common actions that are applicable
+ * in all RaftState's it will delegate the handling of the RequestVote
+ * message to the derived class to do more state specific handling by calling
+ * this method
+ *
+ * @param sender The actor that sent this message
+ * @param requestVote The RequestVote message
+ * @param suggestedState The state that the RaftActor should be in based
+ * on the base class's processing of the RequestVote
+ * message
+ * @return
+ */
+ protected abstract RaftState handleRequestVote(ActorRef sender,
+ RequestVote requestVote, RaftState suggestedState);
+
+ /**
+ * Derived classes should not directly handle RequestVoteReply messages it
+ * should let the base class handle it first. Once the base class handles
+ * the RequestVoteReply message and does the common actions that are
+ * applicable in all RaftState's it will delegate the handling of the
+ * RequestVoteReply message to the derived class to do more state specific
+ * handling by calling this method
+ *
+ * @param sender The actor that sent this message
+ * @param requestVoteReply The RequestVoteReply message
+ * @param suggestedState The state that the RaftActor should be in based
+ * on the base class's processing of the RequestVote
+ * message
+ * @return
+ */
+
+ protected abstract RaftState handleRequestVoteReply(ActorRef sender,
+ RequestVoteReply requestVoteReply, RaftState suggestedState);
+
+ /**
+ * @return The derived class should return the state that corresponds to
+ * it's behavior
+ */
+ protected abstract RaftState state();
+
+ @Override
+ public RaftState handleMessage(ActorRef sender, Object message) {
+ RaftState raftState = state();
+ if (message instanceof RaftRPC) {
+ raftState = applyTerm((RaftRPC) message);
+ }
+ if (message instanceof AppendEntries) {
+ AppendEntries appendEntries = (AppendEntries) message;
+ if (appendEntries.getLeaderCommit() > context.getLastApplied()
+ .get()) {
+ applyLogToStateMachine(appendEntries.getLeaderCommit());
+ }
+ raftState = handleAppendEntries(sender, appendEntries, raftState);
+ } else if (message instanceof AppendEntriesReply) {
+ raftState =
+ handleAppendEntriesReply(sender, (AppendEntriesReply) message,
+ raftState);
+ } else if (message instanceof RequestVote) {
+ raftState =
+ handleRequestVote(sender, (RequestVote) message, raftState);
+ } else if (message instanceof RequestVoteReply) {
+ raftState =
+ handleRequestVoteReply(sender, (RequestVoteReply) message,
+ raftState);
+ }
+ return raftState;
+ }
+
+ private RaftState applyTerm(RaftRPC rpc) {
+ if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()
+ .get()) {
+ context.getTermInformation().update(rpc.getTerm(), null);
+ return RaftState.Follower;
+ }
+ return state();
+ }
+
+ private void applyLogToStateMachine(long index) {
+ context.getLastApplied().set(index);
+ }
}
import akka.actor.ActorRef;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import java.util.List;
this.peers = peers;
}
+ @Override protected RaftState handleAppendEntries(ActorRef sender,
+ AppendEntries appendEntries, RaftState suggestedState) {
+ return suggestedState;
+ }
+
+ @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+ AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
+ return suggestedState;
+ }
+
+ @Override protected RaftState handleRequestVote(ActorRef sender,
+ RequestVote requestVote, RaftState suggestedState) {
+ return suggestedState;
+ }
+
+ @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+ RequestVoteReply requestVoteReply, RaftState suggestedState) {
+ return suggestedState;
+ }
+
+ @Override protected RaftState state() {
+ return RaftState.Candidate;
+ }
+
@Override
public RaftState handleMessage(ActorRef sender, Object message) {
- return RaftState.Candidate;
+ return super.handleMessage(sender, message);
}
}
import akka.actor.ActorRef;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
/**
* The behavior of a RaftActor in the Follower state
+ *
+ * <ul>
+ * <li> Respond to RPCs from candidates and leaders
+ * <li> If election timeout elapses without receiving AppendEntries
+ * RPC from current leader or granting vote to candidate:
+ * convert to candidate
+ * </ul>
+ *
*/
public class Follower extends AbstractRaftActorBehavior {
public Follower(RaftActorContext context) {
super(context);
}
- @Override public RaftState handleMessage(ActorRef sender, Object message) {
+ @Override protected RaftState handleAppendEntries(ActorRef sender,
+ AppendEntries appendEntries, RaftState suggestedState) {
+ return suggestedState;
+ }
+
+ @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+ AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
+ return suggestedState;
+ }
+
+ @Override protected RaftState handleRequestVote(ActorRef sender,
+ RequestVote requestVote, RaftState suggestedState) {
+ return suggestedState;
+ }
+
+ @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+ RequestVoteReply requestVoteReply, RaftState suggestedState) {
+ return suggestedState;
+ }
+
+ @Override protected RaftState state() {
return RaftState.Follower;
}
+
+ @Override public RaftState handleMessage(ActorRef sender, Object message) {
+ return super.handleMessage(sender, message);
+ }
}
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import java.util.Collections;
import java.util.HashMap;
}
+ @Override protected RaftState handleAppendEntries(ActorRef sender,
+ AppendEntries appendEntries, RaftState suggestedState) {
+ return suggestedState;
+ }
+
+ @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+ AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
+ return suggestedState;
+ }
+
+ @Override protected RaftState handleRequestVote(ActorRef sender,
+ RequestVote requestVote, RaftState suggestedState) {
+ return suggestedState;
+ }
+
+ @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+ RequestVoteReply requestVoteReply, RaftState suggestedState) {
+ return suggestedState;
+ }
+
+ @Override protected RaftState state() {
+ return RaftState.Leader;
+ }
+
@Override public RaftState handleMessage(ActorRef sender, Object message) {
Preconditions.checkNotNull(sender, "sender should not be null");
context.getTermInformation().getCurrentTerm().get() , context.getId(),
context.getReplicatedLog().last().getIndex(),
context.getReplicatedLog().last().getTerm(),
- Collections.EMPTY_LIST), context.getActor());
+ Collections.EMPTY_LIST, context.getCommitIndex().get()), context.getActor());
+ return state();
}
- return RaftState.Leader;
+ return super.handleMessage(sender, message);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+public class AbstractRaftRPC implements RaftRPC {
+ // term
+ protected long term;
+
+ protected AbstractRaftRPC(long term){
+ this.term = term;
+ }
+
+ public long getTerm() {
+ return term;
+ }
+
+
+}
* Invoked by leader to replicate log entries (§5.3); also used as
* heartbeat (§5.2).
*/
-public class AppendEntries {
- // Leaders term
- private final long term;
-
+public class AppendEntries extends AbstractRaftRPC {
// So that follower can redirect clients
private final String leaderId;
// may send more than one for efficiency)
private final List<Object> entries;
+ // leader's commitIndex
+ private final long leaderCommit;
+
public AppendEntries(long term, String leaderId, long prevLogIndex,
- long prevLogTerm, List<Object> entries) {
- this.term = term;
+ long prevLogTerm, List<Object> entries, long leaderCommit) {
+ super(term);
this.leaderId = leaderId;
this.prevLogIndex = prevLogIndex;
this.prevLogTerm = prevLogTerm;
this.entries = entries;
- }
-
- public long getTerm() {
- return term;
+ this.leaderCommit = leaderCommit;
}
public String getLeaderId() {
public List<Object> getEntries() {
return entries;
}
+
+ public long getLeaderCommit() {
+ return leaderCommit;
+ }
}
/**
* Reply for the AppendEntriesRpc message
*/
-public class AppendEntriesReply {
- // currentTerm, for leader to update itself
- private final long term;
+public class AppendEntriesReply extends AbstractRaftRPC{
// true if follower contained entry matching
// prevLogIndex and prevLogTerm
private final boolean success;
public AppendEntriesReply(long term, boolean success) {
- this.term = term;
+ super(term);
this.success = success;
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+public interface RaftRPC {
+ public long getTerm();
+}
/**
* Invoked by candidates to gather votes (§5.2).
*/
-public class RequestVote {
-
- // candidate’s term
- private final long term;
+public class RequestVote extends AbstractRaftRPC{
// candidate requesting vote
private final String candidateId;
public RequestVote(long term, String candidateId, long lastLogIndex,
long lastLogTerm) {
- this.term = term;
+ super(term);
this.candidateId = candidateId;
this.lastLogIndex = lastLogIndex;
this.lastLogTerm = lastLogTerm;
package org.opendaylight.controller.cluster.raft.messages;
-public class RequestVoteReply {
-
- // currentTerm, for candidate to update itself
- private final long term;
+public class RequestVoteReply extends AbstractRaftRPC{
// true means candidate received vot
private final boolean voteGranted;
public RequestVoteReply(long term, boolean voteGranted) {
- this.term = term;
+ super(term);
this.voteGranted = voteGranted;
}
private String id;
private ActorSystem system;
private ActorRef actor;
+ private AtomicLong index = new AtomicLong(0);
+ private AtomicLong lastApplied = new AtomicLong(0);
public MockRaftActorContext(){
return new ElectionTermImpl(this.id);
}
+ public void setIndex(AtomicLong index){
+ this.index = index;
+ }
+
@Override public AtomicLong getCommitIndex() {
- throw new UnsupportedOperationException("getCommitIndex");
+ return index;
+ }
+
+ public void setLastApplied(AtomicLong lastApplied){
+ this.lastApplied = lastApplied;
}
@Override public AtomicLong getLastApplied() {
- throw new UnsupportedOperationException("getLastApplied");
+ return lastApplied;
}
@Override public ReplicatedLog getReplicatedLog() {
--- /dev/null
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import akka.testkit.JavaTestKit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{
+ @Test
+ public void testHandlingOfRaftRPCWithNewerTerm() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(),
+ createAppendEntriesWithNewerTerm());
+
+ assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(),
+ createAppendEntriesReplyWithNewerTerm());
+
+ assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(),
+ createRequestVoteWithNewerTerm());
+
+ assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(),
+ createRequestVoteReplyWithNewerTerm());
+
+
+ }};
+ }
+
+ @Test
+ public void testHandlingOfAppendEntriesWithNewerCommitIndex() throws Exception{
+ new JavaTestKit(getSystem()) {{
+
+ MockRaftActorContext context =
+ new MockRaftActorContext();
+
+ context.setLastApplied(new AtomicLong(100));
+
+ AppendEntries appendEntries =
+ new AppendEntries(100, "leader-1", 0, 0, null, 101);
+
+ RaftState raftState =
+ createBehavior(context).handleMessage(getRef(), appendEntries);
+
+ assertEquals(new AtomicLong(101).get(), context.getLastApplied().get());
+
+ }};
+ }
+
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(
+ ActorRef actorRef, RaftRPC rpc){
+ RaftState raftState = createBehavior()
+ .handleMessage(actorRef, rpc);
+
+ assertEquals(RaftState.Follower, raftState);
+ }
+
+ protected abstract RaftActorBehavior createBehavior(RaftActorContext actorContext);
+
+ protected RaftActorBehavior createBehavior(){
+ return createBehavior(new MockRaftActorContext());
+ }
+
+ protected AppendEntries createAppendEntriesWithNewerTerm(){
+ return new AppendEntries(100, "leader-1", 0, 0, null, 1);
+ }
+
+ protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm(){
+ return new AppendEntriesReply(100, false);
+ }
+
+ protected RequestVote createRequestVoteWithNewerTerm(){
+ return new RequestVote(100, "candidate-1", 10, 100);
+ }
+
+ protected RequestVoteReply createRequestVoteReplyWithNewerTerm(){
+ return new RequestVoteReply(100, false);
+ }
+
+}
--- /dev/null
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+
+import java.util.Collections;
+
+public class CandidateTest extends AbstractRaftActorBehaviorTest {
+
+ @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
+ return new Candidate(actorContext, Collections.EMPTY_LIST);
+ }
+}
--- /dev/null
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+
+public class FollowerTest extends AbstractRaftActorBehaviorTest {
+ @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
+ return new Follower(actorContext);
+ }
+}
import akka.testkit.JavaTestKit;
import junit.framework.Assert;
import org.junit.Test;
-import org.opendaylight.controller.cluster.raft.AbstractActorTest;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import static org.junit.Assert.assertEquals;
-public class LeaderTest extends AbstractActorTest {
+public class LeaderTest extends AbstractRaftActorBehaviorTest {
@Test
public void testHandleMessageForUnknownMessage() throws Exception {
};
}};
}
+
+ @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
+ return new Leader(actorContext, Collections.EMPTY_LIST);
+ }
}