package org.opendaylight.controller.cluster.raft.behaviors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.AbstractActorTest;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.SerializationUtils;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import org.slf4j.LoggerFactory;
public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
}};
}
- /**
- * This test verifies that when an AppendEntries RPC is received by a RaftActor
- * with a commitIndex that is greater than what has been applied to the
- * state machine of the RaftActor, the RaftActor applies the state and
- * sets it current applied state to the commitIndex of the sender.
- *
- * @throws Exception
- */
- @Test
- public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
- new JavaTestKit(getSystem()) {{
-
- RaftActorContext context =
- createActorContext();
-
- context.setLastApplied(100);
- setLastLogEntry((MockRaftActorContext) context, 0, 0, "");
-
- // The new commitIndex is 101
- AppendEntries appendEntries =
- new AppendEntries(100, "leader-1", 0, 0, null, 101);
-
- RaftState raftState =
- createBehavior(context).handleMessage(getRef(), appendEntries);
-
- assertEquals(101L, context.getLastApplied());
-
- }};
- }
/**
* This test verifies that when an AppendEntries is received with a term that
throws Exception {
new JavaTestKit(getSystem()) {{
- MockRaftActorContext context = (MockRaftActorContext)
- createActorContext();
+ MockRaftActorContext context = createActorContext();
// First set the receivers term to a high number (1000)
context.getTermInformation().update(1000, "test");
AppendEntries appendEntries =
- new AppendEntries(100, "leader-1", 0, 0, null, 101);
+ new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
RaftActorBehavior behavior = createBehavior(context);
// Send an unknown message so that the state of the RaftActor remains unchanged
- RaftState expected = behavior.handleMessage(getRef(), "unknown");
+ RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
- RaftState raftState =
+ RaftActorBehavior raftBehavior =
behavior.handleMessage(getRef(), appendEntries);
- assertEquals(expected, raftState);
+ assertEquals(expected, raftBehavior);
// Also expect an AppendEntriesReply to be sent where success is false
final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
"AppendEntriesReply") {
// do not put code outside this method, will run afterwards
+ @Override
protected Boolean match(Object in) {
if (in instanceof AppendEntriesReply) {
AppendEntriesReply reply = (AppendEntriesReply) in;
}};
}
- /**
- * This test verifies that when an AppendEntries is received a specific prevLogTerm
- * which does not match the term that is in RaftActors log entry at prevLogIndex
- * then the RaftActor does not change it's state and it returns a failure.
- *
- * @throws Exception
- */
- @Test
- public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
- throws Exception {
- new JavaTestKit(getSystem()) {{
-
- MockRaftActorContext context = (MockRaftActorContext)
- createActorContext();
-
- // First set the receivers term to lower number
- context.getTermInformation().update(95, "test");
-
- // Set the last log entry term for the receiver to be greater than
- // what we will be sending as the prevLogTerm in AppendEntries
- MockRaftActorContext.MockReplicatedLog mockReplicatedLog =
- setLastLogEntry(context, 20, 0, "");
-
- // Also set the entry at index 0 with term 20 which will be greater
- // than the prevLogTerm sent by the sender
- mockReplicatedLog.setReplicatedLogEntry(
- new MockRaftActorContext.MockReplicatedLogEntry(20, 0, ""));
-
- // AppendEntries is now sent with a bigger term
- // this will set the receivers term to be the same as the sender's term
- AppendEntries appendEntries =
- new AppendEntries(100, "leader-1", 0, 0, null, 101);
-
- RaftActorBehavior behavior = createBehavior(context);
-
- // Send an unknown message so that the state of the RaftActor remains unchanged
- RaftState expected = behavior.handleMessage(getRef(), "unknown");
-
- RaftState raftState =
- behavior.handleMessage(getRef(), appendEntries);
- assertEquals(expected, raftState);
-
- // Also expect an AppendEntriesReply to be sent where success is false
- final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
- "AppendEntriesReply") {
- // do not put code outside this method, will run afterwards
- protected Boolean match(Object in) {
- if (in instanceof AppendEntriesReply) {
- AppendEntriesReply reply = (AppendEntriesReply) in;
- return reply.isSuccess();
- } else {
- throw noMatch();
- }
- }
- }.get();
-
- assertEquals(false, out);
-
-
- }};
- }
-
- /**
- * This test verifies that when a new AppendEntries message is received with
- * new entries and the logs of the sender and receiver match that the new
- * entries get added to the log and the log is incremented by the number of
- * entries received in appendEntries
- *
- * @throws Exception
- */
@Test
- public void testHandleAppendEntriesAddNewEntries() throws Exception {
- new JavaTestKit(getSystem()) {{
-
- MockRaftActorContext context = (MockRaftActorContext)
- createActorContext();
-
- // First set the receivers term to lower number
- context.getTermInformation().update(1, "test");
-
- // Prepare the receivers log
- MockRaftActorContext.SimpleReplicatedLog log =
- new MockRaftActorContext.SimpleReplicatedLog();
- log.append(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
- log.append(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one"));
- log.append(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two"));
-
- context.setReplicatedLog(log);
-
- // Prepare the entries to be sent with AppendEntries
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- entries.add(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 3, "three"));
- entries.add(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 4, "four"));
-
- // Send appendEntries with the same term as was set on the receiver
- // before the new behavior was created (1 in this case)
- // This will not work for a Candidate because as soon as a Candidate
- // is created it increments the term
- AppendEntries appendEntries =
- new AppendEntries(1, "leader-1", 2, 1, entries, 101);
-
- RaftActorBehavior behavior = createBehavior(context);
-
- if (AbstractRaftActorBehaviorTest.this instanceof CandidateTest) {
- // Resetting the Candidates term to make sure it will match
- // the term sent by AppendEntries. If this was not done then
- // the test will fail because the Candidate will assume that
- // the message was sent to it from a lower term peer and will
- // thus respond with a failure
- context.getTermInformation().update(1, "test");
- }
-
- // Send an unknown message so that the state of the RaftActor remains unchanged
- RaftState expected = behavior.handleMessage(getRef(), "unknown");
-
- RaftState raftState =
- behavior.handleMessage(getRef(), appendEntries);
-
- assertEquals(expected, raftState);
- assertEquals(5, log.last().getIndex() + 1);
- assertNotNull(log.get(3));
- assertNotNull(log.get(4));
-
- // Also expect an AppendEntriesReply to be sent where success is false
- final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
- "AppendEntriesReply") {
- // do not put code outside this method, will run afterwards
- protected Boolean match(Object in) {
- if (in instanceof AppendEntriesReply) {
- AppendEntriesReply reply = (AppendEntriesReply) in;
- return reply.isSuccess();
- } else {
- throw noMatch();
- }
- }
- }.get();
+ public void testHandleAppendEntriesAddSameEntryToLog(){
+ new JavaTestKit(getSystem()) {
+ {
- assertEquals(true, out);
+ MockRaftActorContext context = createActorContext();
+ // First set the receivers term to lower number
+ context.getTermInformation().update(2, "test");
- }};
- }
-
- /**
- * This test verifies that when a new AppendEntries message is received with
- * new entries and the logs of the sender and receiver are out-of-sync that
- * the log is first corrected by removing the out of sync entries from the
- * log and then adding in the new entries sent with the AppendEntries message
- *
- * @throws Exception
- */
- @Test
- public void testHandleAppendEntriesCorrectReceiverLogEntries()
- throws Exception {
- new JavaTestKit(getSystem()) {{
-
- MockRaftActorContext context = (MockRaftActorContext)
- createActorContext();
-
- // First set the receivers term to lower number
- context.getTermInformation().update(2, "test");
-
- // Prepare the receivers log
- MockRaftActorContext.SimpleReplicatedLog log =
- new MockRaftActorContext.SimpleReplicatedLog();
- log.append(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
- log.append(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one"));
- log.append(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two"));
-
- context.setReplicatedLog(log);
-
- // Prepare the entries to be sent with AppendEntries
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- entries.add(
- new MockRaftActorContext.MockReplicatedLogEntry(2, 2, "two-1"));
- entries.add(
- new MockRaftActorContext.MockReplicatedLogEntry(2, 3, "three"));
-
- // Send appendEntries with the same term as was set on the receiver
- // before the new behavior was created (1 in this case)
- // This will not work for a Candidate because as soon as a Candidate
- // is created it increments the term
- AppendEntries appendEntries =
- new AppendEntries(2, "leader-1", 1, 1, entries, 101);
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log =
+ new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
- RaftActorBehavior behavior = createBehavior(context);
+ context.setReplicatedLog(log);
- if (AbstractRaftActorBehaviorTest.this instanceof CandidateTest) {
- // Resetting the Candidates term to make sure it will match
- // the term sent by AppendEntries. If this was not done then
- // the test will fail because the Candidate will assume that
- // the message was sent to it from a lower term peer and will
- // thus respond with a failure
- context.getTermInformation().update(2, "test");
- }
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
- // Send an unknown message so that the state of the RaftActor remains unchanged
- RaftState expected = behavior.handleMessage(getRef(), "unknown");
+ AppendEntries appendEntries =
+ new AppendEntries(2, "leader-1", -1, 1, entries, 0, -1);
- RaftState raftState =
- behavior.handleMessage(getRef(), appendEntries);
+ RaftActorBehavior behavior = createBehavior(context);
- assertEquals(expected, raftState);
+ if (AbstractRaftActorBehaviorTest.this instanceof CandidateTest) {
+ // Resetting the Candidates term to make sure it will match
+ // the term sent by AppendEntries. If this was not done then
+ // the test will fail because the Candidate will assume that
+ // the message was sent to it from a lower term peer and will
+ // thus respond with a failure
+ context.getTermInformation().update(2, "test");
+ }
- // The entry at index 2 will be found out-of-sync with the leader
- // and will be removed
- // Then the two new entries will be added to the log
- // Thus making the log to have 4 entries
- assertEquals(4, log.last().getIndex() + 1);
- assertNotNull(log.get(2));
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
- // Check that the entry at index 2 has the new data
- assertEquals("two-1", log.get(2).getData());
- assertNotNull(log.get(3));
+ RaftActorBehavior raftBehavior =
+ behavior.handleMessage(getRef(), appendEntries);
- // Also expect an AppendEntriesReply to be sent where success is false
- final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
- "AppendEntriesReply") {
- // do not put code outside this method, will run afterwards
- protected Boolean match(Object in) {
- if (in instanceof AppendEntriesReply) {
- AppendEntriesReply reply = (AppendEntriesReply) in;
- return reply.isSuccess();
- } else {
- throw noMatch();
- }
- }
- }.get();
+ assertEquals(expected, raftBehavior);
- assertEquals(true, out);
+ assertEquals(1, log.size());
- }};
+ }};
}
/**
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
- RaftActorBehavior follower = createBehavior(
+ RaftActorBehavior behavior = createBehavior(
createActorContext(behaviorActor));
- follower.handleMessage(getTestActor(),
+ RaftActorBehavior raftBehavior = behavior.handleMessage(getTestActor(),
new RequestVote(1000, "test", 10000, 999));
- final Boolean out =
- new ExpectMsg<Boolean>(duration("1 seconds"),
- "RequestVoteReply") {
- // do not put code outside this method, will run afterwards
- protected Boolean match(Object in) {
- if (in instanceof RequestVoteReply) {
- RequestVoteReply reply =
- (RequestVoteReply) in;
- return reply.isVoteGranted();
- } else {
- throw noMatch();
+ if(!(behavior instanceof Follower)){
+ assertTrue(raftBehavior instanceof Follower);
+ } else {
+
+ final Boolean out =
+ new ExpectMsg<Boolean>(duration("1 seconds"),
+ "RequestVoteReply") {
+ // do not put code outside this method, will run afterwards
+ @Override
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply =
+ (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
}
- }
- }.get();
+ }.get();
- assertEquals(true, out);
+ assertEquals(true, out);
+ }
}
};
}};
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
RaftActorContext actorContext =
createActorContext(behaviorActor);
- MockRaftActorContext.MockReplicatedLog
- log = new MockRaftActorContext.MockReplicatedLog();
- log.setReplicatedLogEntry(
+ MockRaftActorContext.SimpleReplicatedLog
+ log = new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(
new MockRaftActorContext.MockReplicatedLogEntry(20000,
- 1000000, ""));
- log.setLast(
- new MockRaftActorContext.MockReplicatedLogEntry(20000,
- 1000000, "")
- );
+ 1000000, new MockRaftActorContext.MockPayload("")));
((MockRaftActorContext) actorContext).setReplicatedLog(log);
- RaftActorBehavior follower = createBehavior(actorContext);
+ RaftActorBehavior behavior = createBehavior(actorContext);
- follower.handleMessage(getTestActor(),
+ RaftActorBehavior raftBehavior = behavior.handleMessage(getTestActor(),
new RequestVote(1000, "test", 10000, 999));
- final Boolean out =
- new ExpectMsg<Boolean>(duration("1 seconds"),
- "RequestVoteReply") {
- // do not put code outside this method, will run afterwards
- protected Boolean match(Object in) {
- if (in instanceof RequestVoteReply) {
- RequestVoteReply reply =
- (RequestVoteReply) in;
- return reply.isVoteGranted();
- } else {
- throw noMatch();
+ if(!(behavior instanceof Follower)){
+ assertTrue(raftBehavior instanceof Follower);
+ } else {
+ final Boolean out =
+ new ExpectMsg<Boolean>(duration("1 seconds"),
+ "RequestVoteReply") {
+ // do not put code outside this method, will run afterwards
+ @Override
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply =
+ (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
}
- }
- }.get();
+ }.get();
- assertEquals(false, out);
+ assertEquals(false, out);
+ }
}
};
}};
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
RaftActorContext context =
new ExpectMsg<Boolean>(duration("1 seconds"),
"RequestVoteReply") {
// do not put code outside this method, will run afterwards
+ @Override
protected Boolean match(Object in) {
if (in instanceof RequestVoteReply) {
RequestVoteReply reply =
}};
}
+ @Test
+ public void testPerformSnapshot() {
+ MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), behaviorActor);
+ AbstractRaftActorBehavior abstractBehavior = (AbstractRaftActorBehavior) createBehavior(context);
+ if (abstractBehavior instanceof Candidate) {
+ return;
+ }
+
+ context.getTermInformation().update(1, "test");
+
+ //log has 1 entry with replicatedToAllIndex = 0, does not do anything, returns the
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+ context.setLastApplied(0);
+ abstractBehavior.performSnapshotWithoutCapture(0);
+ assertEquals(-1, abstractBehavior.getReplicatedToAllIndex());
+ assertEquals(1, context.getReplicatedLog().size());
+
+ //2 entries, lastApplied still 0, no purging.
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
+ context.setLastApplied(0);
+ abstractBehavior.performSnapshotWithoutCapture(0);
+ assertEquals(-1, abstractBehavior.getReplicatedToAllIndex());
+ assertEquals(2, context.getReplicatedLog().size());
+
+ //2 entries, lastApplied still 0, no purging.
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
+ context.setLastApplied(1);
+ abstractBehavior.performSnapshotWithoutCapture(0);
+ assertEquals(0, abstractBehavior.getReplicatedToAllIndex());
+ assertEquals(1, context.getReplicatedLog().size());
+
+ //5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and 1 will only get purged
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 5, 1).build());
+ context.setLastApplied(2);
+ abstractBehavior.performSnapshotWithoutCapture(3);
+ assertEquals(1, abstractBehavior.getReplicatedToAllIndex());
+ assertEquals(3, context.getReplicatedLog().size());
+
+ // scenario where Last applied > Replicated to all index (becoz of a slow follower)
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ context.setLastApplied(2);
+ abstractBehavior.performSnapshotWithoutCapture(1);
+ assertEquals(1, abstractBehavior.getReplicatedToAllIndex());
+ assertEquals(1, context.getReplicatedLog().size());
+ }
+
+
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(
ActorRef actorRef, RaftRPC rpc) {
RaftActorContext actorContext = createActorContext();
+ Payload p = new MockRaftActorContext.MockPayload("");
setLastLogEntry(
- (MockRaftActorContext) actorContext, 0, 0, "");
+ (MockRaftActorContext) actorContext, 0, 0, p);
- RaftState raftState = createBehavior(actorContext)
+ RaftActorBehavior raftBehavior = createBehavior(actorContext)
.handleMessage(actorRef, rpc);
- assertEquals(RaftState.Follower, raftState);
+ assertTrue(raftBehavior instanceof Follower);
}
- protected MockRaftActorContext.MockReplicatedLog setLastLogEntry(
- MockRaftActorContext actorContext, long term, long index, Object data) {
+ protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(
+ MockRaftActorContext actorContext, long term, long index, Payload data) {
return setLastLogEntry(actorContext,
new MockRaftActorContext.MockReplicatedLogEntry(term, index, data));
}
- protected MockRaftActorContext.MockReplicatedLog setLastLogEntry(
+ protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(
MockRaftActorContext actorContext, ReplicatedLogEntry logEntry) {
- MockRaftActorContext.MockReplicatedLog
- log = new MockRaftActorContext.MockReplicatedLog();
- // By default MockReplicateLog has last entry set to (1,1,"")
- log.setLast(logEntry);
+ MockRaftActorContext.SimpleReplicatedLog
+ log = new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(logEntry);
actorContext.setReplicatedLog(log);
return log;
return createBehavior(createActorContext());
}
- protected RaftActorContext createActorContext() {
+ protected MockRaftActorContext createActorContext() {
return new MockRaftActorContext();
}
- protected RaftActorContext createActorContext(ActorRef actor) {
+ protected MockRaftActorContext createActorContext(ActorRef actor) {
return new MockRaftActorContext("test", getSystem(), actor);
}
protected AppendEntries createAppendEntriesWithNewerTerm() {
- return new AppendEntries(100, "leader-1", 0, 0, null, 1);
+ return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1);
}
protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() {
- return new AppendEntriesReply(100, false);
+ return new AppendEntriesReply("follower-1", 100, false, 100, 100);
}
protected RequestVote createRequestVoteWithNewerTerm() {
return new RequestVoteReply(100, false);
}
+ protected Object fromSerializableMessage(Object serializable){
+ return SerializationUtils.fromSerializable(serializable);
+ }
+ protected ByteString toByteString(Map<String, String> state) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try(ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+ oos.writeObject(state);
+ return ByteString.copyFrom(bos.toByteArray());
+ } catch (IOException e) {
+ throw new AssertionError("IOException occurred converting Map to Bytestring", e);
+ }
+ }
+ protected void logStart(String name) {
+ LoggerFactory.getLogger(LeaderTest.class).info("Starting " + name);
+ }
}