import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
import java.io.ByteArrayOutputStream;
actorContext.getReplicatedLog().removeFrom(0);
- actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(0, 1,
- new MockRaftActorContext.MockPayload("foo")));
-
- ReplicatedLogImplEntry entry =
- new ReplicatedLogImplEntry(1, 1,
- new MockRaftActorContext.MockPayload("foo"));
-
- actorContext.getReplicatedLog().append(entry);
+ actorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
+ .build());
Leader leader = new Leader(actorContext);
RaftState raftState = leader
- .handleMessage(senderActor, new Replicate(null, "state-id",entry));
+ .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
// State should not change
assertEquals(RaftState.Leader, raftState);
new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
new MockRaftActorContext.MockPayload("D"));
-
RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot());
assertEquals(RaftState.Leader, raftState);
return null;
}
+ public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
+ private static AbstractRaftActorBehavior behavior;
+
+ public ForwardMessageToBehaviorActor(){
+
+ }
+
+ @Override public void onReceive(Object message) throws Exception {
+ super.onReceive(message);
+ behavior.handleMessage(sender(), message);
+ }
+
+ public static void setBehavior(AbstractRaftActorBehavior behavior){
+ ForwardMessageToBehaviorActor.behavior = behavior;
+ }
+ }
+
+ @Test
+ public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+
+ MockRaftActorContext followerActorContext =
+ new MockRaftActorContext("follower", getSystem(), followerActor);
+
+ Follower follower = new Follower(followerActorContext);
+
+ ForwardMessageToBehaviorActor.setBehavior(follower);
+
+ Map<String, String> peerAddresses = new HashMap();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+
+ //create 3 entries
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+
+ followerActorContext.getReplicatedLog().removeFrom(0);
+
+ // follower too has the exact same log entries and has the same commit index
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ followerActorContext.setCommitIndex(1);
+
+ Leader leader = new Leader(leaderActorContext);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ AppendEntriesMessages.AppendEntries appendEntries =
+ (AppendEntriesMessages.AppendEntries) MessageCollectorActor
+ .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+
+ assertNotNull(appendEntries);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+ assertEquals(0, appendEntries.getPrevLogIndex());
+
+ AppendEntriesReply appendEntriesReply =
+ (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+ leaderActor, AppendEntriesReply.class);
+
+ assertNotNull(appendEntriesReply);
+
+ // follower returns its next index
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ }};
+ }
+
+
+ @Test
+ public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ ActorRef followerActor = getSystem().actorOf(
+ Props.create(ForwardMessageToBehaviorActor.class));
+
+ MockRaftActorContext followerActorContext =
+ new MockRaftActorContext("follower", getSystem(), followerActor);
+
+ Follower follower = new Follower(followerActorContext);
+
+ ForwardMessageToBehaviorActor.setBehavior(follower);
+
+ Map<String, String> peerAddresses = new HashMap();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+
+ followerActorContext.getReplicatedLog().removeFrom(0);
+
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ // follower has the same log entries but its commit index > leaders commit index
+ followerActorContext.setCommitIndex(2);
+
+ Leader leader = new Leader(leaderActorContext);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ AppendEntriesMessages.AppendEntries appendEntries =
+ (AppendEntriesMessages.AppendEntries) MessageCollectorActor
+ .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+
+ assertNotNull(appendEntries);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+ assertEquals(0, appendEntries.getPrevLogIndex());
+
+ AppendEntriesReply appendEntriesReply =
+ (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+ leaderActor, AppendEntriesReply.class);
+
+ assertNotNull(appendEntriesReply);
+
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ }};
+ }
+
private static class LeaderTestKit extends JavaTestKit {
private LeaderTestKit(ActorSystem actorSystem) {