AppendEntries appendEntries, RaftState suggestedState);
+ /**
+ * appendEntries first processes the AppendEntries message and then
+ * delegates handling to a specific behavior
+ *
+ * @param sender
+ * @param appendEntries
+ * @param raftState
+ * @return
+ */
protected RaftState appendEntries(ActorRef sender,
AppendEntries appendEntries, RaftState raftState) {
* AppendEntriesReply message
* @return
*/
-
protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
AppendEntriesReply appendEntriesReply, RaftState suggestedState);
+ /**
+ * requestVote handles the RequestVote message. This logic is common
+ * for all behaviors
+ *
+ * @param sender
+ * @param requestVote
+ * @param suggestedState
+ * @return
+ */
protected RaftState requestVote(ActorRef sender,
RequestVote requestVote, RaftState suggestedState) {
* message
* @return
*/
-
protected abstract RaftState handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply, RaftState suggestedState);
+ /**
+ * Creates a random election duration
+ *
+ * @return
+ */
protected FiniteDuration electionDuration() {
long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
return new FiniteDuration(ELECTION_TIME_INTERVAL + variance,
TimeUnit.MILLISECONDS);
}
+ /**
+ * stop the scheduled election
+ */
protected void stopElection() {
if (electionCancel != null && !electionCancel.isCancelled()) {
electionCancel.cancel();
}
}
+ /**
+ * schedule a new election
+ *
+ * @param interval
+ */
protected void scheduleElection(FiniteDuration interval) {
-
stopElection();
// Schedule an election. When the scheduler triggers an ElectionTimeout
context.getActorSystem().dispatcher(), context.getActor());
}
+ /**
+ * Get the current term
+ * @return
+ */
protected long currentTerm() {
return context.getTermInformation().getCurrentTerm();
}
+ /**
+ * Get the candidate for whom we voted in the current term
+ * @return
+ */
protected String votedFor() {
return context.getTermInformation().getVotedFor();
}
+ /**
+ * Get the actor associated with this behavior
+ * @return
+ */
protected ActorRef actor() {
return context.getActor();
}
+ /**
+ * Get the term from the last entry in the log
+ *
+ * @return
+ */
protected long lastTerm() {
return context.getReplicatedLog().lastTerm();
}
+ /**
+ * Get the index from the last entry in the log
+ *
+ * @return
+ */
protected long lastIndex() {
return context.getReplicatedLog().lastIndex();
}
+ /**
+ * Find the client request tracker for a specific logIndex
+ *
+ * @param logIndex
+ * @return
+ */
protected ClientRequestTracker findClientRequestTracker(long logIndex) {
return null;
}
+ /**
+ * Find the log index from the previous to last entry in the log
+ *
+ * @return
+ */
+ protected long prevLogIndex(long index){
+ ReplicatedLogEntry prevEntry =
+ context.getReplicatedLog().get(index - 1);
+ if (prevEntry != null) {
+ return prevEntry.getIndex();
+ }
+ return -1;
+ }
+
+ /**
+ * Find the log term from the previous to last entry in the log
+ * @return
+ */
+ protected long prevLogTerm(long index){
+ ReplicatedLogEntry prevEntry =
+ context.getReplicatedLog().get(index - 1);
+ if (prevEntry != null) {
+ return prevEntry.getTerm();
+ }
+ return -1;
+ }
+
+ /**
+ * Apply the provided index to the state machine
+ *
+ * @param index a log index that is known to be committed
+ */
protected void applyLogToStateMachine(long index) {
// Now maybe we apply to the state machine
for (long i = context.getLastApplied() + 1;
public Leader(RaftActorContext context) {
super(context);
- if(lastIndex() >= 0) {
+ if (lastIndex() >= 0) {
context.setCommitIndex(lastIndex());
}
}
}
- if (replicatedCount >= minReplicationCount){
+ if (replicatedCount >= minReplicationCount) {
ReplicatedLogEntry replicatedLogEntry =
context.getReplicatedLog().get(N);
if (replicatedLogEntry != null
}
}
- if(context.getCommitIndex() > context.getLastApplied()){
+ // Apply the change to the state machine
+ if (context.getCommitIndex() > context.getLastApplied()) {
applyLogToStateMachine(context.getCommitIndex());
}
if (message instanceof SendHeartBeat) {
return sendHeartBeat();
} else if (message instanceof Replicate) {
-
- Replicate replicate = (Replicate) message;
- long logIndex = replicate.getReplicatedLogEntry().getIndex();
-
- context.getLogger().debug("Replicate message " + logIndex);
-
- if (followerToActor.size() == 0) {
- context.setCommitIndex(
- replicate.getReplicatedLogEntry().getIndex());
-
- context.getActor()
- .tell(new ApplyState(replicate.getClientActor(),
- replicate.getIdentifier(),
- replicate.getReplicatedLogEntry()),
- context.getActor()
- );
- } else {
-
- trackerList.add(
- new ClientRequestTrackerImpl(replicate.getClientActor(),
- replicate.getIdentifier(),
- logIndex)
- );
-
- ReplicatedLogEntry prevEntry =
- context.getReplicatedLog().get(lastIndex() - 1);
- long prevLogIndex = -1;
- long prevLogTerm = -1;
- if (prevEntry != null) {
- prevLogIndex = prevEntry.getIndex();
- prevLogTerm = prevEntry.getTerm();
- }
- // Send an AppendEntries to all followers
- for (String followerId : followerToActor.keySet()) {
- ActorSelection followerActor =
- followerToActor.get(followerId);
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex, prevLogTerm,
- context.getReplicatedLog().getFrom(
- followerLogInformation.getNextIndex()
- .get()
- ), context.getCommitIndex()
- ),
- actor()
- );
- }
- }
+ replicate((Replicate) message);
}
} finally {
scheduleHeartBeat(HEART_BEAT_INTERVAL);
return super.handleMessage(sender, message);
}
- private RaftState sendHeartBeat() {
- if (followerToActor.size() > 0) {
- for (String follower : followerToActor.keySet()) {
-
- FollowerLogInformation followerLogInformation =
- followerToLog.get(follower);
+ private void replicate(Replicate replicate) {
+ long logIndex = replicate.getReplicatedLogEntry().getIndex();
- AtomicLong nextIndex =
- followerLogInformation.getNextIndex();
+ context.getLogger().debug("Replicate message " + logIndex);
- List<ReplicatedLogEntry> entries =
- context.getReplicatedLog().getFrom(nextIndex.get());
+ if (followerToActor.size() == 0) {
+ context.setCommitIndex(
+ replicate.getReplicatedLogEntry().getIndex());
- followerToActor.get(follower).tell(new AppendEntries(
- context.getTermInformation().getCurrentTerm(),
- context.getId(),
- context.getReplicatedLog().lastIndex(),
- context.getReplicatedLog().lastTerm(),
- entries, context.getCommitIndex()),
+ context.getActor()
+ .tell(new ApplyState(replicate.getClientActor(),
+ replicate.getIdentifier(),
+ replicate.getReplicatedLogEntry()),
context.getActor()
);
- }
+ } else {
+
+ // Create a tracker entry we will use this later to notify the
+ // client actor
+ trackerList.add(
+ new ClientRequestTrackerImpl(replicate.getClientActor(),
+ replicate.getIdentifier(),
+ logIndex)
+ );
+
+ sendAppendEntries();
+ }
+ }
+
+ private void sendAppendEntries() {
+ // Send an AppendEntries to all followers
+ for (String followerId : followerToActor.keySet()) {
+ ActorSelection followerActor =
+ followerToActor.get(followerId);
+
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+
+ long nextIndex = followerLogInformation.getNextIndex().get();
+
+ List<ReplicatedLogEntry> entries =
+ context.getReplicatedLog().getFrom(nextIndex);
+
+ followerActor.tell(
+ new AppendEntries(currentTerm(), context.getId(),
+ prevLogIndex(nextIndex), prevLogTerm(nextIndex),
+ entries, context.getCommitIndex()
+ ),
+ actor()
+ );
+ }
+ }
+
+ private RaftState sendHeartBeat() {
+ if (followerToActor.size() > 0) {
+ sendAppendEntries();
}
return state();
}
}
private void scheduleHeartBeat(FiniteDuration interval) {
+ if(followerToActor.keySet().size() == 0){
+ // Optimization - do not bother scheduling a heartbeat as there are
+ // no followers
+ return;
+ }
+
stopHeartBeat();
// Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
public class LeaderTest extends AbstractRaftActorBehaviorTest {
- private ActorRef leaderActor = getSystem().actorOf(Props.create(DoNothingActor.class));
- private ActorRef senderActor = getSystem().actorOf(Props.create(DoNothingActor.class));
+ private ActorRef leaderActor =
+ getSystem().actorOf(Props.create(DoNothingActor.class));
+ private ActorRef senderActor =
+ getSystem().actorOf(Props.create(DoNothingActor.class));
@Test
public void testHandleMessageForUnknownMessage() throws Exception {
@Test
- public void testThatLeaderSendsAHeartbeatMessageToAllFollowers(){
+ public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
ActorRef followerActor = getTestActor();
- MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
Map<String, String> peerAddresses = new HashMap();
- peerAddresses.put(followerActor.path().toString(), followerActor.path().toString());
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
actorContext.setPeerAddresses(peerAddresses);
Leader leader = new Leader(actorContext);
leader.handleMessage(senderActor, new SendHeartBeat());
- final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- protected String match(Object in) {
- if (in instanceof AppendEntries) {
- if (((AppendEntries) in).getTerm()
- == 0) {
- return "match";
+ final String out =
+ new ExpectMsg<String>(duration("1 seconds"),
+ "match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof AppendEntries) {
+ if (((AppendEntries) in).getTerm()
+ == 0) {
+ return "match";
+ }
+ return null;
+ } else {
+ throw noMatch();
}
- return null;
- } else {
- throw noMatch();
}
- }
- }.get(); // this extracts the received message
+ }.get(); // this extracts the received message
assertEquals("match", out);
}};
}
- @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
+ @Test
+ public void testHandleReplicateMessageSendAppendEntriesToFollower() {
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ ActorRef followerActor = getTestActor();
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+
+ Map<String, String> peerAddresses = new HashMap();
+
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ actorContext.setPeerAddresses(peerAddresses);
+
+ Leader leader = new Leader(actorContext);
+ RaftState raftState = leader
+ .handleMessage(senderActor, new Replicate(null, null,
+ new MockRaftActorContext.MockReplicatedLogEntry(1,
+ 100,
+ "foo")
+ ));
+
+ // State should not change
+ assertEquals(RaftState.Leader, raftState);
+
+ final String out =
+ new ExpectMsg<String>(duration("1 seconds"),
+ "match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof AppendEntries) {
+ if (((AppendEntries) in).getTerm()
+ == 0) {
+ return "match";
+ }
+ return null;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+
+ }
+
+
+ };
+ }};
+ }
+
+ @Test
+ public void testHandleReplicateMessageWhenThereAreNoFollowers() {
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ ActorRef raftActor = getTestActor();
+
+ MockRaftActorContext actorContext =
+ new MockRaftActorContext("test", getSystem(), raftActor);
+
+ Leader leader = new Leader(actorContext);
+ RaftState raftState = leader
+ .handleMessage(senderActor, new Replicate(null, "state-id",
+ new MockRaftActorContext.MockReplicatedLogEntry(1,
+ 100,
+ "foo")
+ ));
+
+ // State should not change
+ assertEquals(RaftState.Leader, raftState);
+
+ assertEquals(100, actorContext.getCommitIndex());
+
+ final String out =
+ new ExpectMsg<String>(duration("1 seconds"),
+ "match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof ApplyState) {
+ if (((ApplyState) in).getIdentifier().equals("state-id")) {
+ return "match";
+ }
+ return null;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+
+ }
+
+
+ };
+ }};
+ }
+
+ @Override protected RaftActorBehavior createBehavior(
+ RaftActorContext actorContext) {
return new Leader(actorContext);
}