package org.opendaylight.controller.cluster.raft.behaviors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
import com.google.protobuf.ByteString;
-import junit.framework.Assert;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static akka.pattern.Patterns.ask;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
public class FollowerTest extends AbstractRaftActorBehaviorTest {
Follower follower =
new Follower(raftActorContext);
- RaftState raftState =
+ RaftActorBehavior raftBehavior =
follower.handleMessage(followerActor, new ElectionTimeout());
- Assert.assertEquals(RaftState.Candidate, raftState);
+ assertTrue(raftBehavior instanceof Candidate);
}
@Test
// The new commitIndex is 101
AppendEntries appendEntries =
- new AppendEntries(2, "leader-1", 100, 1, entries, 101);
+ new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
- RaftState raftState =
+ RaftActorBehavior raftBehavior =
createBehavior(context).handleMessage(getRef(), appendEntries);
assertEquals(101L, context.getLastApplied());
// AppendEntries is now sent with a bigger term
// this will set the receivers term to be the same as the sender's term
AppendEntries appendEntries =
- new AppendEntries(100, "leader-1", 0, 0, null, 101);
+ new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
RaftActorBehavior behavior = createBehavior(context);
// Send an unknown message so that the state of the RaftActor remains unchanged
- RaftState expected = behavior.handleMessage(getRef(), "unknown");
+ RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
- RaftState raftState =
+ RaftActorBehavior raftBehavior =
behavior.handleMessage(getRef(), appendEntries);
- assertEquals(expected, raftState);
+ assertEquals(expected, raftBehavior);
// Also expect an AppendEntriesReply to be sent where success is false
final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
// This will not work for a Candidate because as soon as a Candidate
// is created it increments the term
AppendEntries appendEntries =
- new AppendEntries(1, "leader-1", 2, 1, entries, 4);
+ new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
RaftActorBehavior behavior = createBehavior(context);
// Send an unknown message so that the state of the RaftActor remains unchanged
- RaftState expected = behavior.handleMessage(getRef(), "unknown");
+ RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
- RaftState raftState =
+ RaftActorBehavior raftBehavior =
behavior.handleMessage(getRef(), appendEntries);
- assertEquals(expected, raftState);
+ assertEquals(expected, raftBehavior);
assertEquals(5, log.last().getIndex() + 1);
assertNotNull(log.get(3));
assertNotNull(log.get(4));
// This will not work for a Candidate because as soon as a Candidate
// is created it increments the term
AppendEntries appendEntries =
- new AppendEntries(2, "leader-1", 1, 1, entries, 3);
+ new AppendEntries(2, "leader-1", 1, 1, entries, 3, -1);
RaftActorBehavior behavior = createBehavior(context);
// Send an unknown message so that the state of the RaftActor remains unchanged
- RaftState expected = behavior.handleMessage(getRef(), "unknown");
+ RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
- RaftState raftState =
+ RaftActorBehavior raftBehavior =
behavior.handleMessage(getRef(), appendEntries);
- assertEquals(expected, raftState);
+ assertEquals(expected, raftBehavior);
// The entry at index 2 will be found out-of-sync with the leader
// and will be removed
}};
}
+ @Test
+ public void testHandleAppendEntriesPreviousLogEntryMissing(){
+ new JavaTestKit(getSystem()) {{
+
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext();
+
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log =
+ new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
+
+ context.setReplicatedLog(log);
+
+ // Prepare the entries to be sent with AppendEntries
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
+
+ AppendEntries appendEntries =
+ new AppendEntries(1, "leader-1", 3, 1, entries, 4, -1);
+
+ RaftActorBehavior behavior = createBehavior(context);
+
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
+
+ RaftActorBehavior raftBehavior =
+ behavior.handleMessage(getRef(), appendEntries);
+
+ assertEquals(expected, raftBehavior);
+
+ // Also expect an AppendEntriesReply to be sent where success is false
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+ "AppendEntriesReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof AppendEntriesReply) {
+ AppendEntriesReply reply = (AppendEntriesReply) in;
+ return reply.isSuccess();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(false, out);
+
+ }};
+
+ }
+
+ @Test
+ public void testHandleAppendAfterInstallingSnapshot(){
+ new JavaTestKit(getSystem()) {{
+
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext();
+
+
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log =
+ new MockRaftActorContext.SimpleReplicatedLog();
+
+ // Set up a log as if it has been snapshotted
+ log.setSnapshotIndex(3);
+ log.setSnapshotTerm(1);
+
+ context.setReplicatedLog(log);
+
+ // Prepare the entries to be sent with AppendEntries
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
+
+ AppendEntries appendEntries =
+ new AppendEntries(1, "leader-1", 3, 1, entries, 4, 3);
+
+ RaftActorBehavior behavior = createBehavior(context);
+
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
+
+ RaftActorBehavior raftBehavior =
+ behavior.handleMessage(getRef(), appendEntries);
+
+ assertEquals(expected, raftBehavior);
+
+ // Also expect an AppendEntriesReply to be sent where success is false
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+ "AppendEntriesReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof AppendEntriesReply) {
+ AppendEntriesReply reply = (AppendEntriesReply) in;
+ return reply.isSuccess();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(true, out);
+
+ }};
+
+ }
+
/**
* This test verifies that when InstallSnapshot is received by
int offset = 0;
int snapshotLength = bsSnapshot.size();
int i = 1;
+ int chunkIndex = 1;
do {
chunkData = getNextChunk(bsSnapshot, offset);
final InstallSnapshot installSnapshot =
new InstallSnapshot(1, "leader-1", i, 1,
- chunkData, i, 3);
+ chunkData, chunkIndex, 3);
follower.handleMessage(leaderActor, installSnapshot);
offset = offset + 50;
i++;
+ chunkIndex++;
} while ((offset+50) < snapshotLength);
- final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3);
+ final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, chunkIndex, 3);
follower.handleMessage(leaderActor, installSnapshot3);
String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
}
}.get();
+ // Verify that after a snapshot is successfully applied the collected snapshot chunks is reset to empty
+ assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
+
String applySnapshotMatch = "";
for (String reply: matches) {
if (reply.startsWith("applySnapshot")) {
}};
}
- public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
- FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
- Timeout operationTimeout = new Timeout(operationDuration);
- Future<Object> future = ask(actor, message, operationTimeout);
+ @Test
+ public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
+ JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {
+ {
- try {
- return Await.result(future, operationDuration);
- } catch (Exception e) {
- throw e;
- }
+ ActorRef leaderActor = getSystem().actorOf(Props.create(
+ MessageCollectorActor.class));
+
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext(getRef());
+
+ Follower follower = (Follower) createBehavior(context);
+
+ HashMap<String, String> followerSnapshot = new HashMap<>();
+ followerSnapshot.put("1", "A");
+ followerSnapshot.put("2", "B");
+ followerSnapshot.put("3", "C");
+
+ ByteString bsSnapshot = toByteString(followerSnapshot);
+
+ final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", 3, 1, getNextChunk(bsSnapshot, 10), 3, 3);
+ follower.handleMessage(leaderActor, installSnapshot);
+
+ Object messages = executeLocalOperation(leaderActor, "get-all-messages");
+
+ assertNotNull(messages);
+ assertTrue(messages instanceof List);
+ List<Object> listMessages = (List<Object>) messages;
+
+ int installSnapshotReplyReceivedCount = 0;
+ for (Object message: listMessages) {
+ if (message instanceof InstallSnapshotReply) {
+ ++installSnapshotReplyReceivedCount;
+ }
+ }
+
+ assertEquals(1, installSnapshotReplyReceivedCount);
+ InstallSnapshotReply reply = (InstallSnapshotReply) listMessages.get(0);
+ assertEquals(false, reply.isSuccess());
+ assertEquals(-1, reply.getChunkIndex());
+ assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
+
+
+ }};
+ }
+
+ public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
+ return MessageCollectorActor.getAllMessages(actor);
}
public ByteString getNextChunk (ByteString bs, int offset){