BUG 2849 : Reduce sending of duplicate replication messages 23/16723/2
authorMoiz Raja <moraja@cisco.com>
Wed, 11 Mar 2015 20:22:31 +0000 (13:22 -0700)
committerMoiz Raja <moraja@cisco.com>
Tue, 17 Mar 2015 23:02:39 +0000 (16:02 -0700)
With Raft not all append entries are expected to reach the followers
so it is a requirement that when acknowledgement of an append entries
is not received we send a duplicate append entries.

In replication scenarios where a lot of rapid replication messages
are being generated the lack of equally rapid acknowledgement results
in a lot of duplicate messages being generated. This later results in
a cycle when the replies for those duplicate replication messages start
arriving and result in even more duplicate replication messages being
sent.

This patch addresses this problem by disallowing a replication message
to be generated for a follower is the nextIndex for that follower
has not changed or the time since the last replication message exceeds
the heartbeat interval. This allows us to reduce duplicates but still
allows duplicates to get generated for legitimate reasons.

Change-Id: I0918672468f4fc1da935c45c7b46c750cde619f4
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index 0beccd1b2b01ead56d4ea73235a3687fd73f408b..07b6b617aaa862b472fb479be247479e1096f434 100644 (file)
@@ -91,4 +91,10 @@ public interface FollowerLogInformation {
      */
     long timeSinceLastActivity();
 
+    /**
+     * This method checks if it is ok to replicate
+     *
+     * @return true if it is ok to replicate
+     */
+    boolean okToReplicate();
 }
index 288a540344bda27cab5276265ebcae6079c492fd..15063cff5b4a8a9c0114dbad73709f27172fc050 100644 (file)
@@ -26,6 +26,11 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     private volatile long matchIndex;
 
+    private long lastReplicatedIndex = -1L;
+
+    private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
+
+
     public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
         this.id = id;
         this.nextIndex = context.getCommitIndex();
@@ -110,6 +115,28 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
         return stopwatch.elapsed(TimeUnit.MILLISECONDS);
     }
 
+    @Override
+    public boolean okToReplicate() {
+        // Return false if we are trying to send duplicate data before the heartbeat interval
+        if(getNextIndex() == lastReplicatedIndex){
+            if(lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams()
+                    .getHeartBeatInterval().toMillis()){
+                return false;
+            }
+        }
+
+        resetLastReplicated();
+        return true;
+    }
+
+    private void resetLastReplicated(){
+        lastReplicatedIndex = getNextIndex();
+        if(lastReplicatedStopwatch.isRunning()){
+            lastReplicatedStopwatch.reset();
+        }
+        lastReplicatedStopwatch.start();
+    }
+
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
@@ -120,6 +147,4 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
                 .append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]");
         return builder.toString();
     }
-
-
 }
index a4753a4fe432654554a64578af0b09233069146b..ea08ffa9da2dd455cf159b6547b0baee2d5227b7 100644 (file)
@@ -460,6 +460,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         if (followerActor != null) {
             long followerNextIndex = followerLogInformation.getNextIndex();
             boolean isFollowerActive = followerLogInformation.isFollowerActive();
+            boolean sendAppendEntries = false;
+            List<ReplicatedLogEntry> entries = Collections.EMPTY_LIST;
 
             if (mapFollowerToSnapshot.get(followerId) != null) {
                 // if install snapshot is in process , then sent next chunk if possible
@@ -467,8 +469,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     sendSnapshotChunk(followerActor, followerId);
                 } else if(sendHeartbeat) {
                     // we send a heartbeat even if we have not received a reply for the last chunk
-                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
-                        Collections.<ReplicatedLogEntry>emptyList(), followerId);
+                    sendAppendEntries = true;
                 }
             } else {
                 long leaderLastIndex = context.getReplicatedLog().lastIndex();
@@ -485,10 +486,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                             followerNextIndex, followerId);
 
                     // FIXME : Sending one entry at a time
-                    final List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
-                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
-
+                    if(followerLogInformation.okToReplicate()) {
+                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+                        sendAppendEntries = true;
+                    }
                 } else if (isFollowerActive && followerNextIndex >= 0 &&
                     leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
                     // if the followers next index is not present in the leaders log, and
@@ -503,19 +504,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     }
 
                     // Send heartbeat to follower whenever install snapshot is initiated.
-                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
-                            Collections.<ReplicatedLogEntry>emptyList(), followerId);
-
+                    sendAppendEntries = true;
                     initiateCaptureSnapshot(followerId, followerNextIndex);
 
                 } else if(sendHeartbeat) {
-                    //we send an AppendEntries, even if the follower is inactive
+                    // we send an AppendEntries, even if the follower is inactive
                     // in-order to update the followers timestamp, in case it becomes active again
-                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
-                        Collections.<ReplicatedLogEntry>emptyList(), followerId);
+                    sendAppendEntries = true;
                 }
 
             }
+
+            if(sendAppendEntries) {
+                sendAppendEntriesToFollower(followerActor, followerNextIndex,
+                        entries, followerId);
+            }
         }
     }
 
index 5be9030f5957bd361c73ead2d11353eb5d178f95..bdfd69ec119acab3a19703de8578702c10984c05 100644 (file)
@@ -60,4 +60,24 @@ public class FollowerLogInformationImplTest {
         stopwatch.stop();
         return stopwatch.elapsed(TimeUnit.MILLISECONDS);
     }
+
+    @Test
+    public void testOkToReplicate(){
+        MockRaftActorContext context = new MockRaftActorContext();
+        context.setCommitIndex(9);
+        FollowerLogInformation followerLogInformation =
+                new FollowerLogInformationImpl(
+                        "follower1", 10, context);
+
+        assertTrue(followerLogInformation.okToReplicate());
+        assertFalse(followerLogInformation.okToReplicate());
+
+        // wait for 150 milliseconds and it should work again
+        Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+        assertTrue(followerLogInformation.okToReplicate());
+
+        //increment next index and try immediately and it should work again
+        followerLogInformation.incrNextIndex();
+        assertTrue(followerLogInformation.okToReplicate());
+    }
 }
index 6964db51f273f52ac0591b80cf75edf5997ec68c..d19b669d27532278344ead713f3e7da17dfabeb2 100644 (file)
@@ -120,6 +120,15 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
     }
 
+
+    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
+        MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+                1, index, payload);
+        actorContext.getReplicatedLog().append(newEntry);
+        return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
+    }
+
     @Test
     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
@@ -146,8 +155,7 @@ public class LeaderTest extends AbstractLeaderTest {
         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
                 1, lastIndex + 1, payload);
         actorContext.getReplicatedLog().append(newEntry);
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
-                new Replicate(null, null, newEntry));
+        RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
 
         // State should not change
         assertTrue(raftBehavior instanceof Leader);
@@ -161,6 +169,218 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
     }
 
+    @Test
+    public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
+        logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(5, TimeUnit.SECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        for(int i=0;i<5;i++) {
+            sendReplicate(actorContext, lastIndex+i+1);
+        }
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        // We expect only 1 message to be sent because of two reasons,
+        // - an append entries reply was not received
+        // - the heartbeat interval has not expired
+        // In this scenario if multiple messages are sent they would likely be duplicates
+        assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
+    }
+
+    @Test
+    public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
+        logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(5, TimeUnit.SECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        for(int i=0;i<3;i++) {
+            sendReplicate(actorContext, lastIndex+i+1);
+            leader.handleMessage(followerActor, new AppendEntriesReply(
+                    FOLLOWER_ID, term, true, lastIndex + i + 1, term));
+
+        }
+
+        for(int i=3;i<5;i++) {
+            sendReplicate(actorContext, lastIndex + i + 1);
+        }
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
+        // get sent to the follower - but not the 5th
+        assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
+
+        for(int i=0;i<4;i++) {
+            long expected = allMessages.get(i).getEntries().get(0).getIndex();
+            assertEquals(expected, i+2);
+        }
+    }
+
+    @Test
+    public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
+        logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        sendReplicate(actorContext, lastIndex+1);
+
+        // Wait slightly longer than heartbeat duration
+        Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(leaderActor, new SendHeartBeat());
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+        assertEquals(1, allMessages.get(0).getEntries().size());
+        assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+        assertEquals(1, allMessages.get(1).getEntries().size());
+        assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+
+    }
+
+    @Test
+    public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
+        logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        for(int i=0;i<3;i++) {
+            Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+        }
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
+    }
+
+    @Test
+    public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
+        logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+        leader.handleMessage(leaderActor, new SendHeartBeat());
+        sendReplicate(actorContext, lastIndex+1);
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+        assertEquals(0, allMessages.get(0).getEntries().size());
+        assertEquals(1, allMessages.get(1).getEntries().size());
+    }
+
+
     @Test
     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");