Merge "BUG 2849 : Reduce sending of duplicate replication messages"
authorTom Pantelis <tpanteli@brocade.com>
Tue, 24 Mar 2015 12:43:46 +0000 (12:43 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 24 Mar 2015 12:43:47 +0000 (12:43 +0000)
1  2 
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index 3f085df8dc3b858879981c27e223262f2a5bc40f,d19b669d27532278344ead713f3e7da17dfabeb2..383ebefd36685f6298524ef87353281c325d1abd
@@@ -43,6 -43,7 +43,6 @@@ import org.opendaylight.controller.clus
  import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
  import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
  import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 -import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
  import scala.concurrent.duration.FiniteDuration;
  
  public class LeaderTest extends AbstractLeaderTest {
          assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
      }
  
+     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
+         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+                 1, index, payload);
+         actorContext.getReplicatedLog().append(newEntry);
+         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
+     }
      @Test
      public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
          logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
          MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
                  1, lastIndex + 1, payload);
          actorContext.getReplicatedLog().append(newEntry);
-         RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
-                 new Replicate(null, null, newEntry));
+         RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
  
          // State should not change
          assertTrue(raftBehavior instanceof Leader);
          assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
      }
  
+     @Test
+     public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
+         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
+         MockRaftActorContext actorContext = createActorContextWithFollower();
+         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+             @Override
+             public FiniteDuration getHeartBeatInterval() {
+                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
+             }
+         });
+         long term = 1;
+         actorContext.getTermInformation().update(term, "");
+         leader = new Leader(actorContext);
+         // Leader will send an immediate heartbeat - ignore it.
+         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+         // The follower would normally reply - simulate that explicitly here.
+         long lastIndex = actorContext.getReplicatedLog().lastIndex();
+         leader.handleMessage(followerActor, new AppendEntriesReply(
+                 FOLLOWER_ID, term, true, lastIndex, term));
+         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+         followerActor.underlyingActor().clear();
+         for(int i=0;i<5;i++) {
+             sendReplicate(actorContext, lastIndex+i+1);
+         }
+         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+         // We expect only 1 message to be sent because of two reasons,
+         // - an append entries reply was not received
+         // - the heartbeat interval has not expired
+         // In this scenario if multiple messages are sent they would likely be duplicates
+         assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
+     }
+     @Test
+     public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
+         logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
+         MockRaftActorContext actorContext = createActorContextWithFollower();
+         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+             @Override
+             public FiniteDuration getHeartBeatInterval() {
+                 return FiniteDuration.apply(5, TimeUnit.SECONDS);
+             }
+         });
+         long term = 1;
+         actorContext.getTermInformation().update(term, "");
+         leader = new Leader(actorContext);
+         // Leader will send an immediate heartbeat - ignore it.
+         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+         // The follower would normally reply - simulate that explicitly here.
+         long lastIndex = actorContext.getReplicatedLog().lastIndex();
+         leader.handleMessage(followerActor, new AppendEntriesReply(
+                 FOLLOWER_ID, term, true, lastIndex, term));
+         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+         followerActor.underlyingActor().clear();
+         for(int i=0;i<3;i++) {
+             sendReplicate(actorContext, lastIndex+i+1);
+             leader.handleMessage(followerActor, new AppendEntriesReply(
+                     FOLLOWER_ID, term, true, lastIndex + i + 1, term));
+         }
+         for(int i=3;i<5;i++) {
+             sendReplicate(actorContext, lastIndex + i + 1);
+         }
+         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+         // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
+         // get sent to the follower - but not the 5th
+         assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
+         for(int i=0;i<4;i++) {
+             long expected = allMessages.get(i).getEntries().get(0).getIndex();
+             assertEquals(expected, i+2);
+         }
+     }
+     @Test
+     public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
+         logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
+         MockRaftActorContext actorContext = createActorContextWithFollower();
+         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+             @Override
+             public FiniteDuration getHeartBeatInterval() {
+                 return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
+             }
+         });
+         long term = 1;
+         actorContext.getTermInformation().update(term, "");
+         leader = new Leader(actorContext);
+         // Leader will send an immediate heartbeat - ignore it.
+         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+         // The follower would normally reply - simulate that explicitly here.
+         long lastIndex = actorContext.getReplicatedLog().lastIndex();
+         leader.handleMessage(followerActor, new AppendEntriesReply(
+                 FOLLOWER_ID, term, true, lastIndex, term));
+         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+         followerActor.underlyingActor().clear();
+         sendReplicate(actorContext, lastIndex+1);
+         // Wait slightly longer than heartbeat duration
+         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
+         leader.handleMessage(leaderActor, new SendHeartBeat());
+         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+         assertEquals(1, allMessages.get(0).getEntries().size());
+         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+         assertEquals(1, allMessages.get(1).getEntries().size());
+         assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+     }
+     @Test
+     public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
+         logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
+         MockRaftActorContext actorContext = createActorContextWithFollower();
+         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+             @Override
+             public FiniteDuration getHeartBeatInterval() {
+                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+             }
+         });
+         long term = 1;
+         actorContext.getTermInformation().update(term, "");
+         leader = new Leader(actorContext);
+         // Leader will send an immediate heartbeat - ignore it.
+         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+         // The follower would normally reply - simulate that explicitly here.
+         long lastIndex = actorContext.getReplicatedLog().lastIndex();
+         leader.handleMessage(followerActor, new AppendEntriesReply(
+                 FOLLOWER_ID, term, true, lastIndex, term));
+         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+         followerActor.underlyingActor().clear();
+         for(int i=0;i<3;i++) {
+             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+             leader.handleMessage(leaderActor, new SendHeartBeat());
+         }
+         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+         assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
+     }
+     @Test
+     public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
+         logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
+         MockRaftActorContext actorContext = createActorContextWithFollower();
+         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+             @Override
+             public FiniteDuration getHeartBeatInterval() {
+                 return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+             }
+         });
+         long term = 1;
+         actorContext.getTermInformation().update(term, "");
+         leader = new Leader(actorContext);
+         // Leader will send an immediate heartbeat - ignore it.
+         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+         // The follower would normally reply - simulate that explicitly here.
+         long lastIndex = actorContext.getReplicatedLog().lastIndex();
+         leader.handleMessage(followerActor, new AppendEntriesReply(
+                 FOLLOWER_ID, term, true, lastIndex, term));
+         assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+         followerActor.underlyingActor().clear();
+         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+         leader.handleMessage(leaderActor, new SendHeartBeat());
+         sendReplicate(actorContext, lastIndex+1);
+         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+         assertEquals(0, allMessages.get(0).getEntries().size());
+         assertEquals(1, allMessages.get(1).getEntries().size());
+     }
      @Test
      public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
          logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
  
          leader.handleMessage(leaderActor, new SendHeartBeat());
  
 -        InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor,
 -                InstallSnapshot.SERIALIZABLE_CLASS);
 -
 -        InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
 +        InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
  
          assertEquals(snapshotIndex, is.getLastIncludedIndex());
      }
  
          // check if installsnapshot gets called with the correct values.
  
 -        InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable(
 -                MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class));
 +        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
  
          assertNotNull(installSnapshot.getData());
          assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
  
          leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
  
 -        InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
 -                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 +        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
  
          assertEquals(1, installSnapshot.getChunkIndex());
          assertEquals(3, installSnapshot.getTotalChunks());
          leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
                  FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
  
 -        installSnapshot = MessageCollectorActor.expectFirstMatching(
 -                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 +        installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
  
          assertEquals(2, installSnapshot.getChunkIndex());
          assertEquals(3, installSnapshot.getTotalChunks());
          leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
                  FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
  
 -        installSnapshot = MessageCollectorActor.expectFirstMatching(
 -                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 +        installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
  
          // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
          followerActor.underlyingActor().clear();
          leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
                  FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
  
 -        installSnapshot = MessageCollectorActor.getFirstMatching(
 -                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 +        installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
  
          Assert.assertNull(installSnapshot);
      }
          ByteString bs = toByteString(leadersSnapshot);
          leader.setSnapshot(Optional.of(bs));
  
 +        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
          leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
  
 -        InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
 -                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 +        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
  
          assertEquals(1, installSnapshot.getChunkIndex());
          assertEquals(3, installSnapshot.getTotalChunks());
  
          leader.handleMessage(leaderActor, new SendHeartBeat());
  
 -        installSnapshot = MessageCollectorActor.expectFirstMatching(
 -                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 +        installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
  
          assertEquals(1, installSnapshot.getChunkIndex());
          assertEquals(3, installSnapshot.getTotalChunks());
  
          leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
  
 -        InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
 -                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 +        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
  
          assertEquals(1, installSnapshot.getChunkIndex());
          assertEquals(3, installSnapshot.getTotalChunks());
 -        assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
 +        assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
  
          int hashCode = installSnapshot.getData().hashCode();
  
          leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
                  FOLLOWER_ID, 1, true));
  
 -        installSnapshot = MessageCollectorActor.expectFirstMatching(
 -                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 +        installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
  
          assertEquals(2, installSnapshot.getChunkIndex());
          assertEquals(3, installSnapshot.getTotalChunks());
 -        assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
 +        assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
      }
  
      @Test