X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=27854f40e957a7a395fcd3e8c0cd5df871b3e00e;hb=6e4a2530fbd24f87b47a756bcc281cb825616e2a;hp=d19b669d27532278344ead713f3e7da17dfabeb2;hpb=859ad8ac73dab8064013a95ab4962eecbfb6d346;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index d19b669d27..27854f40e9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -25,6 +25,7 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; @@ -43,12 +44,12 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; -import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; import scala.concurrent.duration.FiniteDuration; public class LeaderTest extends AbstractLeaderTest { static final String FOLLOWER_ID = "follower"; + public static final String LEADER_ID = "leader"; private final TestActorRef leaderActor = actorFactory.createTestActor( Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader")); @@ -85,6 +86,8 @@ public class LeaderTest extends AbstractLeaderTest { logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers"); MockRaftActorContext actorContext = createActorContextWithFollower(); + short payloadVersion = (short)5; + actorContext.setPayloadVersion(payloadVersion); long term = 1; actorContext.getTermInformation().update(term, ""); @@ -98,10 +101,11 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm()); assertEquals("Entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion()); // The follower would normally reply - simulate that explicitly here. leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex - 1, term)); + FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -118,6 +122,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Entries size", 1, appendEntries.getEntries().size()); assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex()); assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); + assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion()); } @@ -146,7 +151,7 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -192,7 +197,7 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -232,7 +237,7 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -240,7 +245,7 @@ public class LeaderTest extends AbstractLeaderTest { for(int i=0;i<3;i++) { sendReplicate(actorContext, lastIndex+i+1); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex + i + 1, term)); + FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0)); } @@ -282,7 +287,7 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -327,7 +332,7 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -364,7 +369,7 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); @@ -485,10 +490,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(leaderActor, new SendHeartBeat()); - InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor, - InstallSnapshot.SERIALIZABLE_CLASS); - - InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto); + InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertEquals(snapshotIndex, is.getLastIncludedIndex()); } @@ -528,6 +530,8 @@ public class LeaderTest extends AbstractLeaderTest { new ReplicatedLogImplEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")); + actorContext.getReplicatedLog().append(entry); + //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); @@ -537,7 +541,7 @@ public class LeaderTest extends AbstractLeaderTest { assertTrue(raftBehavior instanceof Leader); - MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class); + assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); } @Test @@ -585,7 +589,9 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); - CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class); + assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); + + CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot(); assertTrue(cs.isInstallSnapshotInitiated()); assertEquals(3, cs.getLastAppliedIndex()); @@ -596,8 +602,7 @@ public class LeaderTest extends AbstractLeaderTest { // if an initiate is started again when first is in progress, it shouldnt initiate Capture leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); - List captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class); - assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size()); + Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } @Test @@ -637,8 +642,7 @@ public class LeaderTest extends AbstractLeaderTest { // check if installsnapshot gets called with the correct values. - InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable( - MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class)); + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertNotNull(installSnapshot.getData()); assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex()); @@ -742,8 +746,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); - InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); @@ -752,8 +755,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), FOLLOWER_ID, installSnapshot.getChunkIndex(), true)); - installSnapshot = MessageCollectorActor.expectFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); + installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertEquals(2, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); @@ -762,16 +764,14 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), FOLLOWER_ID, installSnapshot.getChunkIndex(), true)); - installSnapshot = MessageCollectorActor.expectFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); + installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower followerActor.underlyingActor().clear(); leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), FOLLOWER_ID, installSnapshot.getChunkIndex(), true)); - installSnapshot = MessageCollectorActor.getFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); + installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class); Assert.assertNull(installSnapshot); } @@ -812,10 +812,10 @@ public class LeaderTest extends AbstractLeaderTest { ByteString bs = toByteString(leadersSnapshot); leader.setSnapshot(Optional.of(bs)); + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); - InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); @@ -830,8 +830,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(leaderActor, new SendHeartBeat()); - installSnapshot = MessageCollectorActor.expectFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); + installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); @@ -874,12 +873,11 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); - InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); - assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode()); + assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue()); int hashCode = installSnapshot.getData().hashCode(); @@ -888,12 +886,11 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(), FOLLOWER_ID, 1, true)); - installSnapshot = MessageCollectorActor.expectFirstMatching( - followerActor, InstallSnapshotMessages.InstallSnapshot.class); + installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertEquals(2, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); - assertEquals(hashCode, installSnapshot.getLastChunkHashCode()); + assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue()); } @Test @@ -958,7 +955,7 @@ public class LeaderTest extends AbstractLeaderTest { @Override protected MockRaftActorContext createActorContext(ActorRef actorRef) { - return createActorContext("leader", actorRef); + return createActorContext(LEADER_ID, actorRef); } private MockRaftActorContext createActorContextWithFollower() { @@ -977,6 +974,15 @@ public class LeaderTest extends AbstractLeaderTest { return context; } + private MockRaftActorContext createFollowerActorContextWithLeader() { + MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); + DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl(); + followerConfig.setElectionTimeoutFactor(10000); + followerActorContext.setConfigParams(followerConfig); + followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); + return followerActorContext; + } + @Test public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception { logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex"); @@ -1037,14 +1043,15 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext leaderActorContext = createActorContext(); MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); + followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); - Map peerAddresses = new HashMap<>(); - peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); + Map leaderPeerAddresses = new HashMap<>(); + leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); - leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.setPeerAddresses(leaderPeerAddresses); leaderActorContext.getReplicatedLog().removeFrom(0); @@ -1104,26 +1111,215 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleAppendEntriesReplyFailure(){ - logStart("testHandleAppendEntriesReplyFailure"); + public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){ + logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader"); MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + long leaderCommitIndex = 2; + leaderActorContext.setCommitIndex(leaderCommitIndex); + leaderActorContext.setLastApplied(leaderCommitIndex); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + + followerActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build()); + followerActorContext.setCommitIndex(1); + followerActorContext.setLastApplied(1); + + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); leader = new Leader(leaderActorContext); - // Send initial heartbeat reply with last index. - leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1)); + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Verify initial AppendEntries sent with the leader's current commit index. + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex()); + + leaderActor.underlyingActor().setBehavior(leader); + + leader.handleMessage(followerActor, appendEntriesReply); + + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2); + List appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + + // Verify AppendEntries sent with the leader's second log entry. + appendEntries = appendEntriesList.get(0); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 1, appendEntries.getEntries().size()); + assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + + // Verify AppendEntries sent with the leader's third log entry. + appendEntries = appendEntriesList.get(1); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 1, appendEntries.getEntries().size()); + assertEquals("Log entry index", 2, appendEntries.getEntries().get(0).getIndex()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); - assertEquals("getNextIndex", 11, followerInfo.getNextIndex()); + assertEquals("getNextIndex", 3, followerInfo.getNextIndex()); - AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1); + assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex()); + assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex()); + } - RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); + @Test + public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() { + logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty"); - assertEquals(RaftState.Leader, raftActorBehavior.state()); + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build()); + long leaderCommitIndex = 1; + leaderActorContext.setCommitIndex(leaderCommitIndex); + leaderActorContext.setLastApplied(leaderCommitIndex); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + + followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + followerActorContext.setCommitIndex(-1); + followerActorContext.setLastApplied(-1); + + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); + + leader = new Leader(leaderActorContext); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); - assertEquals("getNextIndex", 10, followerInfo.getNextIndex()); + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Verify initial AppendEntries sent with the leader's current commit index. + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); + + leaderActor.underlyingActor().setBehavior(leader); + + leader.handleMessage(followerActor, appendEntriesReply); + + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2); + List appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + + // Verify AppendEntries sent with the leader's first log entry. + appendEntries = appendEntriesList.get(0); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 1, appendEntries.getEntries().size()); + assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + + // Verify AppendEntries sent with the leader's second log entry. + appendEntries = appendEntriesList.get(1); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 1, appendEntries.getEntries().size()); + assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals("getNextIndex", 2, followerInfo.getNextIndex()); + + assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex()); + assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex()); + } + + @Test + public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){ + logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build()); + long leaderCommitIndex = 1; + leaderActorContext.setCommitIndex(leaderCommitIndex); + leaderActorContext.setLastApplied(leaderCommitIndex); + + ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0); + ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + + followerActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build()); + followerActorContext.setCommitIndex(-1); + followerActorContext.setLastApplied(-1); + + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); + + leader = new Leader(leaderActorContext); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Verify initial AppendEntries sent with the leader's current commit index. + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); + + leaderActor.underlyingActor().setBehavior(leader); + + leader.handleMessage(followerActor, appendEntriesReply); + + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2); + List appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + + // Verify AppendEntries sent with the leader's first log entry. + appendEntries = appendEntriesList.get(0); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 1, appendEntries.getEntries().size()); + assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + + // Verify AppendEntries sent with the leader's third log entry. + appendEntries = appendEntriesList.get(1); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 1, appendEntries.getEntries().size()); + assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals("getNextIndex", 2, followerInfo.getNextIndex()); + + List applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2); + + ApplyState applyState = applyStateList.get(0); + assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + applyState = applyStateList.get(1); + assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex()); + assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex()); + assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm()); } @Test @@ -1141,7 +1337,8 @@ public class LeaderTest extends AbstractLeaderTest { leader = new Leader(leaderActorContext); - AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1); + short payloadVersion = 5; + AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion); RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); @@ -1164,6 +1361,9 @@ public class LeaderTest extends AbstractLeaderTest { ApplyState applyState = applyStateList.get(0); assertEquals(2, applyState.getReplicatedLogEntry().getIndex()); + + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals(payloadVersion, followerInfo.getPayloadVersion()); } @Test @@ -1174,7 +1374,7 @@ public class LeaderTest extends AbstractLeaderTest { leader = new Leader(leaderActorContext); - AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1); + AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0); RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); @@ -1279,6 +1479,7 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); followerActorContext.setConfigParams(configParams); + followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); @@ -1379,7 +1580,7 @@ public class LeaderTest extends AbstractLeaderTest { for(int i=1;i<6;i++) { // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733) - RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1)); + RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0)); assertTrue(newBehavior == leader); Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); }