From: Tom Pantelis Date: Tue, 2 Jun 2015 06:55:15 +0000 (-0400) Subject: Bug 2787: Batch AppendEntries to speed up follower sync X-Git-Tag: release/beryllium~515 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=8274ae55bc9eba37035a62f49d992f85391524ed Bug 2787: Batch AppendEntries to speed up follower sync AbstractLeader#sendUpdatesToFollower now attempts to send all entries to the follower to catch it up with 1 AppendEntries message. However, we don't want to send too large a message and risk exceeding akka's message size limit. So I added another param, maxDataSize, to ReplicatedLog#getFrom. It will attempt to add all entries up to maxEntries but stops if the accumulated data size exceeds maxDataSize. For maxDataSize, I reused the existing snapshotChunkSize (default 2M) defined in ConfigParams. This currently is hard-coded - we may want to make this configurable. Change-Id: Ib2c1165c4140a36f4eada8f81b4261260615dd18 Signed-off-by: Tom Pantelis (cherry picked from commit 6065ba82c90e366919a1b78105507b935b91af8e) --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index b4b2afbc4a..c245206f64 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -124,20 +124,43 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { @Override public List getFrom(long logEntryIndex) { - return getFrom(logEntryIndex, journal.size()); + return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE); } @Override - public List getFrom(long logEntryIndex, int max) { + public List getFrom(long logEntryIndex, int maxEntries, long maxDataSize) { int adjustedIndex = adjustedIndex(logEntryIndex); int size = journal.size(); if (adjustedIndex >= 0 && adjustedIndex < size) { // physical index should be less than list size and >= 0 - int maxIndex = adjustedIndex + max; + int maxIndex = adjustedIndex + maxEntries; if(maxIndex > size){ maxIndex = size; } - return new ArrayList<>(journal.subList(adjustedIndex, maxIndex)); + + if(maxDataSize == NO_MAX_SIZE) { + return new ArrayList<>(journal.subList(adjustedIndex, maxIndex)); + } else { + List retList = new ArrayList<>(maxIndex - adjustedIndex); + long totalSize = 0; + for(int i = adjustedIndex; i < maxIndex; i++) { + ReplicatedLogEntry entry = journal.get(i); + totalSize += entry.size(); + if(totalSize <= maxDataSize) { + retList.add(entry); + } else { + if(retList.isEmpty()) { + // Edge case - the first entry's size exceeds the threshold. We need to return + // at least the first entry so add it here. + retList.add(entry); + } + + break; + } + } + + return retList; + } } else { return Collections.emptyList(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java index d4d13899eb..1b87f44f84 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java @@ -49,6 +49,8 @@ public class DefaultConfigParamsImpl implements ConfigParams { // in-memory journal can use before it needs to snapshot private int snapshotDataThresholdPercentage = 12; + private int snaphotChunkSize = SNAPSHOT_CHUNK_SIZE; + private long electionTimeoutFactor = 2; public void setHeartBeatInterval(FiniteDuration heartBeatInterval) { @@ -64,6 +66,10 @@ public class DefaultConfigParamsImpl implements ConfigParams { this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage; } + public void setSnaphotChunkSize(int snaphotChunkSize) { + this.snaphotChunkSize = snaphotChunkSize; + } + public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) { this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize; } @@ -109,7 +115,7 @@ public class DefaultConfigParamsImpl implements ConfigParams { @Override public int getSnapshotChunkSize() { - return SNAPSHOT_CHUNK_SIZE; + return snaphotChunkSize; } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index 0ad1df3c33..9e99be1184 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -10,67 +10,61 @@ package org.opendaylight.controller.cluster.raft; import akka.japi.Procedure; import java.util.List; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; /** * Represents the ReplicatedLog that needs to be kept in sync by the RaftActor */ public interface ReplicatedLog { + long NO_MAX_SIZE = -1; + /** - * Get a replicated log entry at the specified index + * Return the replicated log entry at the specified index. * * @param index the index of the log entry - * @return the ReplicatedLogEntry at index. null if index less than 0 or + * @return the ReplicatedLogEntry if found, otherwise null if the adjusted index less than 0 or * greater than the size of the in-memory journal. */ - ReplicatedLogEntry get(long index); - + @Nullable ReplicatedLogEntry get(long index); /** - * Get the last replicated log entry - * - * @return + * Return the last replicated log entry in the log or null of not found. */ - ReplicatedLogEntry last(); + @Nullable ReplicatedLogEntry last(); /** - * - * @return + * Return the index of the last entry in the log or -1 if the log is empty. */ long lastIndex(); /** - * - * @return + * Return the term of the last entry in the log or -1 if the log is empty. */ long lastTerm(); /** - * To be called when we need to remove entries from the in-memory log. - * This method will remove all entries >= index. This method should be used - * during recovery to appropriately trim the log based on persisted - * information + * Removes entries from the in-memory log starting at the given index. * - * @param index the index of the log entry - * @return the adjusted index of the first log entry removed or -1 if log entry not found. + * @param index the index of the first log entry to remove + * @return the adjusted index of the first log entry removed or -1 if the log entry is not found. */ long removeFrom(long index); - /** - * To be called when we need to remove entries from the in-memory log and we - * need that information persisted to disk. This method will remove all - * entries >= index. + * Removes entries from the in-memory log a nd the persisted log starting at the given index. *

* The persisted information would then be used during recovery to properly * reconstruct the state of the in-memory replicated log * - * @param index the index of the log entry + * @param the index of the first log entry to remove */ void removeFromAndPersist(long index); /** - * Append an entry to the log - * @param replicatedLogEntry + * Appends an entry to the log. + * + * @param replicatedLogEntry the entry to append */ void append(ReplicatedLogEntry replicatedLogEntry); @@ -82,24 +76,32 @@ public interface ReplicatedLog { void increaseJournalLogCapacity(int amount); /** + * Appends an entry to the in-memory log and persists it as well. * - * @param replicatedLogEntry + * @param replicatedLogEntry the entry to append */ void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry); void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure callback); /** + * Returns a list of log entries starting from the given index to the end of the log. * - * @param index the index of the log entry + * @param index the index of the first log entry to get. + * @return the List of entries */ - List getFrom(long index); + @Nonnull List getFrom(long index); /** + * Returns a list of log entries starting from the given index up to the given maximum of entries or + * the given maximum accumulated size, whichever comes first. * - * @param index the index of the log entry + * @param index the index of the first log entry to get + * @param maxEntries the maximum number of entries to get + * @param maxDataSize the maximum accumulated size of the log entries to get + * @return the List of entries meeting the criteria. */ - List getFrom(long index, int max); + @Nonnull List getFrom(long index, int maxEntries, long maxDataSize); /** * diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 8fb66998c7..2eb3b32c6f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -504,9 +504,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(), followerNextIndex, followerId); - // FIXME : Sending one entry at a time if(followerLogInformation.okToReplicate()) { - entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); + // Try to send all the entries in the journal but not exceeding the max data size + // for a single AppendEntries message. + int maxEntries = (int) context.getReplicatedLog().size(); + entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries, + context.getConfigParams().getSnapshotChunkSize()); sendAppendEntries = true; } } else if (isFollowerActive && followerNextIndex >= 0 && diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java index afec6a5d3d..a3e4396f37 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java @@ -55,7 +55,7 @@ public class AbstractReplicatedLogImplTest { Assert.assertNull("get(0)", replicatedLogImpl.get(0)); Assert.assertNull("last", replicatedLogImpl.last()); - List list = replicatedLogImpl.getFrom(0, 1); + List list = replicatedLogImpl.getFrom(0, 1, ReplicatedLog.NO_MAX_SIZE); assertEquals("getFrom size", 0, list.size()); assertEquals("removeFrom", -1, replicatedLogImpl.removeFrom(1)); @@ -136,19 +136,41 @@ public class AbstractReplicatedLogImplTest { } @Test - public void testGetFromWithMax(){ - List from = replicatedLogImpl.getFrom(0, 1); + public void testGetFromWithMax() { + List from = replicatedLogImpl.getFrom(0, 1, ReplicatedLog.NO_MAX_SIZE); Assert.assertEquals(1, from.size()); - Assert.assertEquals(1, from.get(0).getTerm()); + Assert.assertEquals("A", from.get(0).getData().toString()); - from = replicatedLogImpl.getFrom(0, 20); + from = replicatedLogImpl.getFrom(0, 20, ReplicatedLog.NO_MAX_SIZE); Assert.assertEquals(4, from.size()); - Assert.assertEquals(2, from.get(3).getTerm()); + Assert.assertEquals("A", from.get(0).getData().toString()); + Assert.assertEquals("D", from.get(3).getData().toString()); - from = replicatedLogImpl.getFrom(1, 2); + from = replicatedLogImpl.getFrom(1, 2, ReplicatedLog.NO_MAX_SIZE); Assert.assertEquals(2, from.size()); - Assert.assertEquals(1, from.get(1).getTerm()); + Assert.assertEquals("B", from.get(0).getData().toString()); + Assert.assertEquals("C", from.get(1).getData().toString()); + from = replicatedLogImpl.getFrom(1, 3, 2); + Assert.assertEquals(2, from.size()); + Assert.assertEquals("B", from.get(0).getData().toString()); + Assert.assertEquals("C", from.get(1).getData().toString()); + + from = replicatedLogImpl.getFrom(1, 3, 3); + Assert.assertEquals(3, from.size()); + Assert.assertEquals("B", from.get(0).getData().toString()); + Assert.assertEquals("C", from.get(1).getData().toString()); + Assert.assertEquals("D", from.get(2).getData().toString()); + + from = replicatedLogImpl.getFrom(1, 2, 3); + Assert.assertEquals(2, from.size()); + Assert.assertEquals("B", from.get(0).getData().toString()); + Assert.assertEquals("C", from.get(1).getData().toString()); + + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("12345"))); + from = replicatedLogImpl.getFrom(4, 2, 2); + Assert.assertEquals(1, from.size()); + Assert.assertEquals("12345", from.get(0).getData().toString()); } @Test diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index e19b521610..31a12d5659 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -435,7 +435,7 @@ public class MockRaftActorContext implements RaftActorContext { public MockReplicatedLogBuilder createEntries(int start, int end, int term) { for (int i=start; i appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1); + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - // Verify AppendEntries sent with the leader's second log entry. - appendEntries = appendEntriesList.get(0); assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); - assertEquals("Log entries size", 1, appendEntries.getEntries().size()); - assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex()); assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); - // Verify AppendEntries sent with the leader's third log entry. - appendEntries = appendEntriesList.get(1); - assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); - assertEquals("Log entries size", 1, appendEntries.getEntries().size()); - assertEquals("Log entry index", 2, appendEntries.getEntries().get(0).getIndex()); - assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry data", leadersSecondLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry data", leadersThirdLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); assertEquals("getNextIndex", 3, followerInfo.getNextIndex()); + List applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2); + + ApplyState applyState = applyStateList.get(0); + assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + applyState = applyStateList.get(1); + assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex()); assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex()); } @@ -1189,6 +1199,9 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setCommitIndex(leaderCommitIndex); leaderActorContext.setLastApplied(leaderCommitIndex); + ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0); + ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); @@ -1215,26 +1228,37 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(followerActor, appendEntriesReply); - MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2); - List appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1); + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - // Verify AppendEntries sent with the leader's first log entry. - appendEntries = appendEntriesList.get(0); assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); - assertEquals("Log entries size", 1, appendEntries.getEntries().size()); - assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex()); assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); - // Verify AppendEntries sent with the leader's second log entry. - appendEntries = appendEntriesList.get(1); - assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); - assertEquals("Log entries size", 1, appendEntries.getEntries().size()); - assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex()); - assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry data", leadersFirstLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry data", leadersSecondLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); assertEquals("getNextIndex", 2, followerInfo.getNextIndex()); + List applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2); + + ApplyState applyState = applyStateList.get(0); + assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + applyState = applyStateList.get(1); + assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex()); assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex()); } @@ -1283,22 +1307,21 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(followerActor, appendEntriesReply); - MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2); - List appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1); + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - // Verify AppendEntries sent with the leader's first log entry. - appendEntries = appendEntriesList.get(0); assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); - assertEquals("Log entries size", 1, appendEntries.getEntries().size()); - assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex()); assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); - // Verify AppendEntries sent with the leader's third log entry. - appendEntries = appendEntriesList.get(1); - assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); - assertEquals("Log entries size", 1, appendEntries.getEntries().size()); - assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex()); - assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm()); + assertEquals("First entry data", leadersFirstLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm()); + assertEquals("Second entry data", leadersSecondLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); assertEquals("getNextIndex", 2, followerInfo.getNextIndex()); @@ -1381,6 +1404,88 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(RaftState.Leader, raftActorBehavior.state()); } + @Test + public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() { + logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build()); + long leaderCommitIndex = 3; + leaderActorContext.setCommitIndex(leaderCommitIndex); + leaderActorContext.setLastApplied(leaderCommitIndex); + + ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0); + ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2); + ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + + followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + followerActorContext.setCommitIndex(-1); + followerActorContext.setLastApplied(-1); + + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); + + leader = new Leader(leaderActorContext); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Verify initial AppendEntries sent with the leader's current commit index. + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex()); + + leaderActor.underlyingActor().setBehavior(leader); + + leader.handleMessage(followerActor, appendEntriesReply); + + List appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2); + + appendEntries = appendEntriesList.get(0); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); + + assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry data", leadersFirstLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry data", leadersSecondLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); + + appendEntries = appendEntriesList.get(1); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); + + assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry data", leadersThirdLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry data", leadersFourthLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); + + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals("getNextIndex", 4, followerInfo.getNextIndex()); + + MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4); + + assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex()); + assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex()); + } + @Test public void testHandleRequestVoteReply(){ logStart("testHandleRequestVoteReply"); @@ -1463,83 +1568,6 @@ public class LeaderTest extends AbstractLeaderTest { }}; } - - @Test - public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception { - logStart("testAppendEntryCallAtEndofAppendEntryReply"); - - MockRaftActorContext leaderActorContext = createActorContextWithFollower(); - - DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); - //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS)); - configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); - - leaderActorContext.setConfigParams(configParams); - - MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); - - followerActorContext.setConfigParams(configParams); - followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); - - Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().setBehavior(follower); - - leaderActorContext.getReplicatedLog().removeFrom(0); - leaderActorContext.setCommitIndex(-1); - leaderActorContext.setLastApplied(-1); - - followerActorContext.getReplicatedLog().removeFrom(0); - followerActorContext.setCommitIndex(-1); - followerActorContext.setLastApplied(-1); - - leader = new Leader(leaderActorContext); - - AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching( - leaderActor, AppendEntriesReply.class); - - leader.handleMessage(followerActor, appendEntriesReply); - - // Clear initial heartbeat messages - - leaderActor.underlyingActor().clear(); - followerActor.underlyingActor().clear(); - - // create 3 entries - leaderActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); - leaderActorContext.setCommitIndex(1); - leaderActorContext.setLastApplied(1); - - Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), - TimeUnit.MILLISECONDS); - - leader.handleMessage(leaderActor, new SendHeartBeat()); - - AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - - // Should send first log entry - assertEquals(1, appendEntries.getLeaderCommit()); - assertEquals(0, appendEntries.getEntries().get(0).getIndex()); - assertEquals(-1, appendEntries.getPrevLogIndex()); - - appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); - - assertEquals(1, appendEntriesReply.getLogLastTerm()); - assertEquals(0, appendEntriesReply.getLogLastIndex()); - - followerActor.underlyingActor().clear(); - - leader.handleAppendEntriesReply(followerActor, appendEntriesReply); - - appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - - // Should send second log entry - assertEquals(1, appendEntries.getLeaderCommit()); - assertEquals(1, appendEntries.getEntries().get(0).getIndex()); - - follower.close(); - } - @Test public void testLaggingFollowerStarvation() throws Exception { logStart("testLaggingFollowerStarvation");