Merge "BUG 2849 : Reduce sending of duplicate replication messages"
authorTom Pantelis <tpanteli@brocade.com>
Tue, 24 Mar 2015 12:43:46 +0000 (12:43 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 24 Mar 2015 12:43:47 +0000 (12:43 +0000)
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index 0beccd1b2b01ead56d4ea73235a3687fd73f408b..07b6b617aaa862b472fb479be247479e1096f434 100644 (file)
@@ -91,4 +91,10 @@ public interface FollowerLogInformation {
      */
     long timeSinceLastActivity();
 
+    /**
+     * This method checks if it is ok to replicate
+     *
+     * @return true if it is ok to replicate
+     */
+    boolean okToReplicate();
 }
index 288a540344bda27cab5276265ebcae6079c492fd..15063cff5b4a8a9c0114dbad73709f27172fc050 100644 (file)
@@ -26,6 +26,11 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     private volatile long matchIndex;
 
+    private long lastReplicatedIndex = -1L;
+
+    private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
+
+
     public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
         this.id = id;
         this.nextIndex = context.getCommitIndex();
@@ -110,6 +115,28 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
         return stopwatch.elapsed(TimeUnit.MILLISECONDS);
     }
 
+    @Override
+    public boolean okToReplicate() {
+        // Return false if we are trying to send duplicate data before the heartbeat interval
+        if(getNextIndex() == lastReplicatedIndex){
+            if(lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams()
+                    .getHeartBeatInterval().toMillis()){
+                return false;
+            }
+        }
+
+        resetLastReplicated();
+        return true;
+    }
+
+    private void resetLastReplicated(){
+        lastReplicatedIndex = getNextIndex();
+        if(lastReplicatedStopwatch.isRunning()){
+            lastReplicatedStopwatch.reset();
+        }
+        lastReplicatedStopwatch.start();
+    }
+
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
@@ -120,6 +147,4 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
                 .append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]");
         return builder.toString();
     }
-
-
 }
index a4753a4fe432654554a64578af0b09233069146b..ea08ffa9da2dd455cf159b6547b0baee2d5227b7 100644 (file)
@@ -460,6 +460,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         if (followerActor != null) {
             long followerNextIndex = followerLogInformation.getNextIndex();
             boolean isFollowerActive = followerLogInformation.isFollowerActive();
+            boolean sendAppendEntries = false;
+            List<ReplicatedLogEntry> entries = Collections.EMPTY_LIST;
 
             if (mapFollowerToSnapshot.get(followerId) != null) {
                 // if install snapshot is in process , then sent next chunk if possible
@@ -467,8 +469,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     sendSnapshotChunk(followerActor, followerId);
                 } else if(sendHeartbeat) {
                     // we send a heartbeat even if we have not received a reply for the last chunk
-                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
-                        Collections.<ReplicatedLogEntry>emptyList(), followerId);
+                    sendAppendEntries = true;
                 }
             } else {
                 long leaderLastIndex = context.getReplicatedLog().lastIndex();
@@ -485,10 +486,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                             followerNextIndex, followerId);
 
                     // FIXME : Sending one entry at a time
-                    final List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
-                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
-
+                    if(followerLogInformation.okToReplicate()) {
+                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+                        sendAppendEntries = true;
+                    }
                 } else if (isFollowerActive && followerNextIndex >= 0 &&
                     leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
                     // if the followers next index is not present in the leaders log, and
@@ -503,19 +504,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     }
 
                     // Send heartbeat to follower whenever install snapshot is initiated.
-                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
-                            Collections.<ReplicatedLogEntry>emptyList(), followerId);
-
+                    sendAppendEntries = true;
                     initiateCaptureSnapshot(followerId, followerNextIndex);
 
                 } else if(sendHeartbeat) {
-                    //we send an AppendEntries, even if the follower is inactive
+                    // we send an AppendEntries, even if the follower is inactive
                     // in-order to update the followers timestamp, in case it becomes active again
-                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
-                        Collections.<ReplicatedLogEntry>emptyList(), followerId);
+                    sendAppendEntries = true;
                 }
 
             }
+
+            if(sendAppendEntries) {
+                sendAppendEntriesToFollower(followerActor, followerNextIndex,
+                        entries, followerId);
+            }
         }
     }
 
index 5be9030f5957bd361c73ead2d11353eb5d178f95..bdfd69ec119acab3a19703de8578702c10984c05 100644 (file)
@@ -60,4 +60,24 @@ public class FollowerLogInformationImplTest {
         stopwatch.stop();
         return stopwatch.elapsed(TimeUnit.MILLISECONDS);
     }
+
+    @Test
+    public void testOkToReplicate(){
+        MockRaftActorContext context = new MockRaftActorContext();
+        context.setCommitIndex(9);
+        FollowerLogInformation followerLogInformation =
+                new FollowerLogInformationImpl(
+                        "follower1", 10, context);
+
+        assertTrue(followerLogInformation.okToReplicate());
+        assertFalse(followerLogInformation.okToReplicate());
+
+        // wait for 150 milliseconds and it should work again
+        Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+        assertTrue(followerLogInformation.okToReplicate());
+
+        //increment next index and try immediately and it should work again
+        followerLogInformation.incrNextIndex();
+        assertTrue(followerLogInformation.okToReplicate());
+    }
 }
index 3f085df8dc3b858879981c27e223262f2a5bc40f..383ebefd36685f6298524ef87353281c325d1abd 100644 (file)
@@ -119,6 +119,15 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
     }
 
+
+    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
+        MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+                1, index, payload);
+        actorContext.getReplicatedLog().append(newEntry);
+        return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
+    }
+
     @Test
     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
@@ -145,8 +154,7 @@ public class LeaderTest extends AbstractLeaderTest {
         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
                 1, lastIndex + 1, payload);
         actorContext.getReplicatedLog().append(newEntry);
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
-                new Replicate(null, null, newEntry));
+        RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
 
         // State should not change
         assertTrue(raftBehavior instanceof Leader);
@@ -160,6 +168,218 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
     }
 
+    @Test
+    public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
+        logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(5, TimeUnit.SECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        for(int i=0;i<5;i++) {
+            sendReplicate(actorContext, lastIndex+i+1);
+        }
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        // We expect only 1 message to be sent because of two reasons,
+        // - an append entries reply was not received
+        // - the heartbeat interval has not expired
+        // In this scenario if multiple messages are sent they would likely be duplicates
+        assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
+    }
+
+    @Test
+    public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
+        logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(5, TimeUnit.SECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        for(int i=0;i<3;i++) {
+            sendReplicate(actorContext, lastIndex+i+1);
+            leader.handleMessage(followerActor, new AppendEntriesReply(
+                    FOLLOWER_ID, term, true, lastIndex + i + 1, term));
+
+        }
+
+        for(int i=3;i<5;i++) {
+            sendReplicate(actorContext, lastIndex + i + 1);
+        }
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
+        // get sent to the follower - but not the 5th
+        assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
+
+        for(int i=0;i<4;i++) {
+            long expected = allMessages.get(i).getEntries().get(0).getIndex();
+            assertEquals(expected, i+2);
+        }
+    }
+
+    @Test
+    public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
+        logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        sendReplicate(actorContext, lastIndex+1);
+
+        // Wait slightly longer than heartbeat duration
+        Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(leaderActor, new SendHeartBeat());
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+        assertEquals(1, allMessages.get(0).getEntries().size());
+        assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+        assertEquals(1, allMessages.get(1).getEntries().size());
+        assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+
+    }
+
+    @Test
+    public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
+        logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        for(int i=0;i<3;i++) {
+            Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+        }
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
+    }
+
+    @Test
+    public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
+        logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+        leader.handleMessage(leaderActor, new SendHeartBeat());
+        sendReplicate(actorContext, lastIndex+1);
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+        assertEquals(0, allMessages.get(0).getEntries().size());
+        assertEquals(1, allMessages.get(1).getEntries().size());
+    }
+
+
     @Test
     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");