package org.opendaylight.controller.cluster.raft.behaviors;
+import static org.junit.Assert.assertEquals;
import akka.actor.ActorRef;
-import akka.testkit.JavaTestKit;
+import akka.actor.Props;
+import akka.testkit.TestActorRef;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.junit.After;
+import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.AbstractActorTest;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.LoggerFactory;
-import java.util.concurrent.atomic.AtomicLong;
+public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
-import static org.junit.Assert.assertEquals;
+ protected final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+
+ private final TestActorRef<MessageCollectorActor> behaviorActor = actorFactory.createTestActor(
+ Props.create(MessageCollectorActor.class), actorFactory.generateActorId("behavior"));
+
+ RaftActorBehavior behavior;
+
+ @After
+ public void tearDown() throws Exception {
+ if(behavior != null) {
+ behavior.close();
+ }
-public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{
- @Test
- public void testHandlingOfRaftRPCWithNewerTerm() throws Exception {
- new JavaTestKit(getSystem()) {{
+ actorFactory.close();
+ }
+
+ /**
+ * This test checks that when a new Raft RPC message is received with a newer
+ * term the RaftActor gets into the Follower state.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHandleRaftRPCWithNewerTerm() throws Exception {
+ RaftActorContext actorContext = createActorContext();
- assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(),
+ assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, behaviorActor,
createAppendEntriesWithNewerTerm());
- assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(),
+ assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, behaviorActor,
createAppendEntriesReplyWithNewerTerm());
- assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(),
+ assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, behaviorActor,
createRequestVoteWithNewerTerm());
- assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(),
+ assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, behaviorActor,
createRequestVoteReplyWithNewerTerm());
+ }
+
+
+ /**
+ * This test verifies that when an AppendEntries is received with a term that
+ * is less that the currentTerm of the RaftActor then the RaftActor does not
+ * change it's state and it responds back with a failure
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHandleAppendEntriesSenderTermLessThanReceiverTerm() throws Exception {
+ MockRaftActorContext context = createActorContext();
+
+ // First set the receivers term to a high number (1000)
+ context.getTermInformation().update(1000, "test");
+
+ AppendEntries appendEntries = new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
+
+ behavior = createBehavior(context);
+
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftActorBehavior expected = behavior.handleMessage(behaviorActor, "unknown");
+
+ RaftActorBehavior raftBehavior = behavior.handleMessage(behaviorActor, appendEntries);
+
+ assertEquals("Raft state", expected.state(), raftBehavior.state());
+
+ // Also expect an AppendEntriesReply to be sent where success is false
+
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(
+ behaviorActor, AppendEntriesReply.class);
+
+ assertEquals("isSuccess", false, reply.isSuccess());
+ }
+
+
+ @Test
+ public void testHandleAppendEntriesAddSameEntryToLog() throws Exception {
+ MockRaftActorContext context = createActorContext();
+
+ context.getTermInformation().update(2, "test");
+
+ // Prepare the receivers log
+ MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("zero");
+ setLastLogEntry(context, 2, 0, payload);
+
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(new MockRaftActorContext.MockReplicatedLogEntry(2, 0, payload));
+
+ AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 2, -1);
+
+ behavior = createBehavior(context);
+
+ if (behavior instanceof Candidate) {
+ // Resetting the Candidates term to make sure it will match
+ // the term sent by AppendEntries. If this was not done then
+ // the test will fail because the Candidate will assume that
+ // the message was sent to it from a lower term peer and will
+ // thus respond with a failure
+ context.getTermInformation().update(2, "test");
+ }
+
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftActorBehavior expected = behavior.handleMessage(behaviorActor, "unknown");
+
+ RaftActorBehavior raftBehavior = behavior.handleMessage(behaviorActor, appendEntries);
+
+ assertEquals("Raft state", expected.state(), raftBehavior.state());
+
+ assertEquals("ReplicatedLog size", 1, context.getReplicatedLog().size());
+
+ handleAppendEntriesAddSameEntryToLogReply(behaviorActor);
+ }
+
+ protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
+ throws Exception {
+ AppendEntriesReply reply = MessageCollectorActor.getFirstMatching(replyActor, AppendEntriesReply.class);
+ Assert.assertNull("Expected no AppendEntriesReply", reply);
+ }
+ /**
+ * This test verifies that when a RequestVote is received by the RaftActor
+ * with the senders' log is more up to date than the receiver that the receiver grants
+ * the vote to the sender.
+ */
+ @Test
+ public void testHandleRequestVoteWhenSenderLogMoreUpToDate() {
+ MockRaftActorContext context = createActorContext();
+
+ behavior = createBehavior(context);
+
+ context.getTermInformation().update(1, "test");
- }};
+ behavior.handleMessage(behaviorActor, new RequestVote(context.getTermInformation().getCurrentTerm(),
+ "test", 10000, 999));
+
+ RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(behaviorActor,
+ RequestVoteReply.class);
+ assertEquals("isVoteGranted", true, reply.isVoteGranted());
}
+ /**
+ * This test verifies that when a RaftActor receives a RequestVote message
+ * with a term that is greater than it's currentTerm but a less up-to-date
+ * log then the receiving RaftActor will not grant the vote to the sender
+ */
@Test
- public void testHandlingOfAppendEntriesWithNewerCommitIndex() throws Exception{
- new JavaTestKit(getSystem()) {{
+ public void testHandleRequestVoteWhenSenderLogLessUptoDate() {
+ MockRaftActorContext context = createActorContext();
+
+ behavior = createBehavior(context);
+
+ context.getTermInformation().update(1, "test");
+
+ int index = 2000;
+ setLastLogEntry(context, context.getTermInformation().getCurrentTerm(), index,
+ new MockRaftActorContext.MockPayload(""));
+
+ behavior.handleMessage(behaviorActor, new RequestVote(
+ context.getTermInformation().getCurrentTerm(), "test",
+ index - 1, context.getTermInformation().getCurrentTerm()));
+
+ RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(behaviorActor,
+ RequestVoteReply.class);
+ assertEquals("isVoteGranted", false, reply.isVoteGranted());
+ }
- MockRaftActorContext context =
- new MockRaftActorContext();
- context.setLastApplied(new AtomicLong(100));
- AppendEntries appendEntries =
- new AppendEntries(100, "leader-1", 0, 0, null, 101);
+ /**
+ * This test verifies that the receiving RaftActor will not grant a vote
+ * to a sender if the sender's term is lesser than the currentTerm of the
+ * recipient RaftActor
+ */
+ @Test
+ public void testHandleRequestVoteWhenSenderTermLessThanCurrentTerm() {
+ RaftActorContext context = createActorContext();
+
+ context.getTermInformation().update(1000, null);
- RaftState raftState =
- createBehavior(context).handleMessage(getRef(), appendEntries);
+ behavior = createBehavior(context);
- assertEquals(new AtomicLong(101).get(), context.getLastApplied().get());
+ behavior.handleMessage(behaviorActor, new RequestVote(999, "test", 10000, 999));
- }};
+ RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(behaviorActor,
+ RequestVoteReply.class);
+ assertEquals("isVoteGranted", false, reply.isVoteGranted());
}
- protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(
- ActorRef actorRef, RaftRPC rpc){
- RaftState raftState = createBehavior()
- .handleMessage(actorRef, rpc);
+ @Test
+ public void testPerformSnapshot() {
+ MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), behaviorActor);
+ AbstractRaftActorBehavior abstractBehavior = (AbstractRaftActorBehavior) createBehavior(context);
+ if (abstractBehavior instanceof Candidate) {
+ return;
+ }
+
+ context.getTermInformation().update(1, "test");
+
+ //log has 1 entry with replicatedToAllIndex = 0, does not do anything, returns the
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+ context.setLastApplied(0);
+ abstractBehavior.performSnapshotWithoutCapture(0);
+ assertEquals(-1, abstractBehavior.getReplicatedToAllIndex());
+ assertEquals(1, context.getReplicatedLog().size());
- assertEquals(RaftState.Follower, raftState);
+ //2 entries, lastApplied still 0, no purging.
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
+ context.setLastApplied(0);
+ abstractBehavior.performSnapshotWithoutCapture(0);
+ assertEquals(-1, abstractBehavior.getReplicatedToAllIndex());
+ assertEquals(2, context.getReplicatedLog().size());
+
+ //2 entries, lastApplied still 0, no purging.
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
+ context.setLastApplied(1);
+ abstractBehavior.performSnapshotWithoutCapture(0);
+ assertEquals(0, abstractBehavior.getReplicatedToAllIndex());
+ assertEquals(1, context.getReplicatedLog().size());
+
+ //5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and 1 will only get purged
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 5, 1).build());
+ context.setLastApplied(2);
+ abstractBehavior.performSnapshotWithoutCapture(3);
+ assertEquals(1, abstractBehavior.getReplicatedToAllIndex());
+ assertEquals(3, context.getReplicatedLog().size());
+
+ // scenario where Last applied > Replicated to all index (becoz of a slow follower)
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ context.setLastApplied(2);
+ abstractBehavior.performSnapshotWithoutCapture(1);
+ assertEquals(1, abstractBehavior.getReplicatedToAllIndex());
+ assertEquals(1, context.getReplicatedLog().size());
}
- protected abstract RaftActorBehavior createBehavior(RaftActorContext actorContext);
- protected RaftActorBehavior createBehavior(){
- return createBehavior(new MockRaftActorContext());
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+ ActorRef actorRef, RaftRPC rpc) throws Exception {
+
+ Payload p = new MockRaftActorContext.MockPayload("");
+ setLastLogEntry((MockRaftActorContext) actorContext, 1, 0, p);
+ actorContext.getTermInformation().update(1, "test");
+
+ RaftActorBehavior origBehavior = createBehavior(actorContext);
+ RaftActorBehavior raftBehavior = origBehavior.handleMessage(actorRef, rpc);
+
+ assertEquals("New raft state", RaftState.Follower, raftBehavior.state());
+ assertEquals("New election term", rpc.getTerm(), actorContext.getTermInformation().getCurrentTerm());
+
+ origBehavior.close();
+ raftBehavior.close();
+ }
+
+ protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(
+ MockRaftActorContext actorContext, long term, long index, Payload data) {
+ return setLastLogEntry(actorContext,
+ new MockRaftActorContext.MockReplicatedLogEntry(term, index, data));
+ }
+
+ protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(MockRaftActorContext actorContext,
+ ReplicatedLogEntry logEntry) {
+ MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(logEntry);
+ actorContext.setReplicatedLog(log);
+
+ return log;
+ }
+
+ protected abstract RaftActorBehavior createBehavior(
+ RaftActorContext actorContext);
+
+ protected RaftActorBehavior createBehavior() {
+ return createBehavior(createActorContext());
}
- protected AppendEntries createAppendEntriesWithNewerTerm(){
- return new AppendEntries(100, "leader-1", 0, 0, null, 1);
+ protected MockRaftActorContext createActorContext() {
+ return new MockRaftActorContext();
}
- protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm(){
- return new AppendEntriesReply(100, false);
+ protected MockRaftActorContext createActorContext(ActorRef actor) {
+ return new MockRaftActorContext("test", getSystem(), actor);
}
- protected RequestVote createRequestVoteWithNewerTerm(){
+ protected AppendEntries createAppendEntriesWithNewerTerm() {
+ return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1);
+ }
+
+ protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() {
+ return new AppendEntriesReply("follower-1", 100, false, 100, 100);
+ }
+
+ protected RequestVote createRequestVoteWithNewerTerm() {
return new RequestVote(100, "candidate-1", 10, 100);
}
- protected RequestVoteReply createRequestVoteReplyWithNewerTerm(){
+ protected RequestVoteReply createRequestVoteReplyWithNewerTerm() {
return new RequestVoteReply(100, false);
}
+ protected Object fromSerializableMessage(Object serializable){
+ return SerializationUtils.fromSerializable(serializable);
+ }
+
+ protected ByteString toByteString(Map<String, String> state) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try(ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+ oos.writeObject(state);
+ return ByteString.copyFrom(bos.toByteArray());
+ } catch (IOException e) {
+ throw new AssertionError("IOException occurred converting Map to Bytestring", e);
+ }
+ }
+
+ protected void logStart(String name) {
+ LoggerFactory.getLogger(LeaderTest.class).info("Starting " + name);
+ }
}