package org.opendaylight.controller.cluster.raft.behaviors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.AbstractActorTest;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.SerializationUtils;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
+import org.slf4j.LoggerFactory;
public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
throws Exception {
new JavaTestKit(getSystem()) {{
- MockRaftActorContext context = (MockRaftActorContext)
- createActorContext();
+ MockRaftActorContext context = createActorContext();
// First set the receivers term to a high number (1000)
context.getTermInformation().update(1000, "test");
AppendEntries appendEntries =
- new AppendEntries(100, "leader-1", 0, 0, null, 101);
+ new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
RaftActorBehavior behavior = createBehavior(context);
// Send an unknown message so that the state of the RaftActor remains unchanged
- RaftState expected = behavior.handleMessage(getRef(), "unknown");
+ RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
- RaftState raftState =
+ RaftActorBehavior raftBehavior =
behavior.handleMessage(getRef(), appendEntries);
- assertEquals(expected, raftState);
+ assertEquals(expected, raftBehavior);
// Also expect an AppendEntriesReply to be sent where success is false
final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
"AppendEntriesReply") {
// do not put code outside this method, will run afterwards
+ @Override
protected Boolean match(Object in) {
if (in instanceof AppendEntriesReply) {
AppendEntriesReply reply = (AppendEntriesReply) in;
new JavaTestKit(getSystem()) {
{
- MockRaftActorContext context = (MockRaftActorContext)
- createActorContext();
+ MockRaftActorContext context = createActorContext();
// First set the receivers term to lower number
context.getTermInformation().update(2, "test");
MockRaftActorContext.SimpleReplicatedLog log =
new MockRaftActorContext.SimpleReplicatedLog();
log.append(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
context.setReplicatedLog(log);
List<ReplicatedLogEntry> entries = new ArrayList<>();
entries.add(
- new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
AppendEntries appendEntries =
- new AppendEntries(2, "leader-1", -1, 1, entries, 0);
+ new AppendEntries(2, "leader-1", -1, 1, entries, 0, -1);
RaftActorBehavior behavior = createBehavior(context);
}
// Send an unknown message so that the state of the RaftActor remains unchanged
- RaftState expected = behavior.handleMessage(getRef(), "unknown");
+ RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
- RaftState raftState =
+ RaftActorBehavior raftBehavior =
behavior.handleMessage(getRef(), appendEntries);
- assertEquals(expected, raftState);
+ assertEquals(expected, raftBehavior);
assertEquals(1, log.size());
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
RaftActorBehavior behavior = createBehavior(
createActorContext(behaviorActor));
- RaftState raftState = behavior.handleMessage(getTestActor(),
+ RaftActorBehavior raftBehavior = behavior.handleMessage(getTestActor(),
new RequestVote(1000, "test", 10000, 999));
- if(behavior.state() != RaftState.Follower){
- assertEquals(RaftState.Follower, raftState);
+ if(!(behavior instanceof Follower)){
+ assertTrue(raftBehavior instanceof Follower);
} else {
final Boolean out =
new ExpectMsg<Boolean>(duration("1 seconds"),
"RequestVoteReply") {
// do not put code outside this method, will run afterwards
+ @Override
protected Boolean match(Object in) {
if (in instanceof RequestVoteReply) {
RequestVoteReply reply =
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
RaftActorContext actorContext =
log = new MockRaftActorContext.SimpleReplicatedLog();
log.append(
new MockRaftActorContext.MockReplicatedLogEntry(20000,
- 1000000, ""));
+ 1000000, new MockRaftActorContext.MockPayload("")));
((MockRaftActorContext) actorContext).setReplicatedLog(log);
RaftActorBehavior behavior = createBehavior(actorContext);
- RaftState raftState = behavior.handleMessage(getTestActor(),
+ RaftActorBehavior raftBehavior = behavior.handleMessage(getTestActor(),
new RequestVote(1000, "test", 10000, 999));
- if(behavior.state() != RaftState.Follower){
- assertEquals(RaftState.Follower, raftState);
+ if(!(behavior instanceof Follower)){
+ assertTrue(raftBehavior instanceof Follower);
} else {
final Boolean out =
new ExpectMsg<Boolean>(duration("1 seconds"),
"RequestVoteReply") {
// do not put code outside this method, will run afterwards
+ @Override
protected Boolean match(Object in) {
if (in instanceof RequestVoteReply) {
RequestVoteReply reply =
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
RaftActorContext context =
new ExpectMsg<Boolean>(duration("1 seconds"),
"RequestVoteReply") {
// do not put code outside this method, will run afterwards
+ @Override
protected Boolean match(Object in) {
if (in instanceof RequestVoteReply) {
RequestVoteReply reply =
}};
}
+ @Test
+ public void testPerformSnapshot() {
+ MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), behaviorActor);
+ AbstractRaftActorBehavior abstractBehavior = (AbstractRaftActorBehavior) createBehavior(context);
+ if (abstractBehavior instanceof Candidate) {
+ return;
+ }
+
+ context.getTermInformation().update(1, "test");
+
+ //log has 1 entry with replicatedToAllIndex = 0, does not do anything, returns the
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+ context.setLastApplied(0);
+ abstractBehavior.performSnapshotWithoutCapture(0);
+ assertEquals(-1, abstractBehavior.getReplicatedToAllIndex());
+ assertEquals(1, context.getReplicatedLog().size());
+
+ //2 entries, lastApplied still 0, no purging.
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
+ context.setLastApplied(0);
+ abstractBehavior.performSnapshotWithoutCapture(0);
+ assertEquals(-1, abstractBehavior.getReplicatedToAllIndex());
+ assertEquals(2, context.getReplicatedLog().size());
+
+ //2 entries, lastApplied still 0, no purging.
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
+ context.setLastApplied(1);
+ abstractBehavior.performSnapshotWithoutCapture(0);
+ assertEquals(0, abstractBehavior.getReplicatedToAllIndex());
+ assertEquals(1, context.getReplicatedLog().size());
+
+ //5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and 1 will only get purged
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 5, 1).build());
+ context.setLastApplied(2);
+ abstractBehavior.performSnapshotWithoutCapture(3);
+ assertEquals(1, abstractBehavior.getReplicatedToAllIndex());
+ assertEquals(3, context.getReplicatedLog().size());
+
+ // scenario where Last applied > Replicated to all index (becoz of a slow follower)
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ context.setLastApplied(2);
+ abstractBehavior.performSnapshotWithoutCapture(1);
+ assertEquals(1, abstractBehavior.getReplicatedToAllIndex());
+ assertEquals(1, context.getReplicatedLog().size());
+ }
+
+
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(
ActorRef actorRef, RaftRPC rpc) {
RaftActorContext actorContext = createActorContext();
+ Payload p = new MockRaftActorContext.MockPayload("");
setLastLogEntry(
- (MockRaftActorContext) actorContext, 0, 0, "");
+ (MockRaftActorContext) actorContext, 0, 0, p);
- RaftState raftState = createBehavior(actorContext)
+ RaftActorBehavior raftBehavior = createBehavior(actorContext)
.handleMessage(actorRef, rpc);
- assertEquals(RaftState.Follower, raftState);
+ assertTrue(raftBehavior instanceof Follower);
}
protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(
- MockRaftActorContext actorContext, long term, long index, Object data) {
+ MockRaftActorContext actorContext, long term, long index, Payload data) {
return setLastLogEntry(actorContext,
new MockRaftActorContext.MockReplicatedLogEntry(term, index, data));
}
return createBehavior(createActorContext());
}
- protected RaftActorContext createActorContext() {
+ protected MockRaftActorContext createActorContext() {
return new MockRaftActorContext();
}
- protected RaftActorContext createActorContext(ActorRef actor) {
+ protected MockRaftActorContext createActorContext(ActorRef actor) {
return new MockRaftActorContext("test", getSystem(), actor);
}
protected AppendEntries createAppendEntriesWithNewerTerm() {
- return new AppendEntries(100, "leader-1", 0, 0, null, 1);
+ return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1);
}
protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() {
return new RequestVoteReply(100, false);
}
+ protected Object fromSerializableMessage(Object serializable){
+ return SerializationUtils.fromSerializable(serializable);
+ }
+ protected ByteString toByteString(Map<String, String> state) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try(ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+ oos.writeObject(state);
+ return ByteString.copyFrom(bos.toByteArray());
+ } catch (IOException e) {
+ throw new AssertionError("IOException occurred converting Map to Bytestring", e);
+ }
+ }
+ protected void logStart(String name) {
+ LoggerFactory.getLogger(LeaderTest.class).info("Starting " + name);
+ }
}