From d1daa68ccdb73666524ae3673ee24361530b285b Mon Sep 17 00:00:00 2001 From: Harman Singh Date: Wed, 4 Feb 2015 17:43:48 -0800 Subject: [PATCH] Optimized AppendEntries sending to follower by adding AppendEntry call at end of reply, if needed. This will help to reduce the lag of a follower and leader does not have to wait for heartbeat time to send a message to follower. This way, we can tune heartbeat interval in better manner. Change-Id: I0259ea3691a3ac1245e36afe15b06ed44f377466 Signed-off-by: Harman Singh --- .../cluster/raft/FollowerLogInformation.java | 8 ++ .../raft/FollowerLogInformationImpl.java | 5 + .../raft/behaviors/AbstractLeader.java | 116 ++++++++++-------- .../cluster/raft/behaviors/LeaderTest.java | 112 ++++++++++++++++- 4 files changed, 183 insertions(+), 58 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index 6d0c14e733..73c81afd18 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -73,4 +73,12 @@ public interface FollowerLogInformation { * This will stop the timeout clock */ void markFollowerInActive(); + + + /** + * This will return the active time of follower, since it was last reset + * @return time in milliseconds + */ + long timeSinceLastActivity(); + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index 7a690d3d18..0fed63098d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -95,4 +95,9 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { stopwatch.stop(); } } + + @Override + public long timeSinceLastActivity() { + return stopwatch.elapsed(TimeUnit.MILLISECONDS); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 410dcee5e5..68cf5027df 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -232,6 +232,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { purgeInMemoryLog(); } + //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event + sendUpdatesToFollower(followerId, followerLogInformation, false); return this; } @@ -419,67 +421,77 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendAppendEntries() { // Send an AppendEntries to all followers - + long heartbeatInterval = context.getConfigParams().getHeartBeatInterval().toMillis(); for (Entry e : followerToLog.entrySet()) { final String followerId = e.getKey(); - ActorSelection followerActor = context.getPeerActorSelection(followerId); - - if (followerActor != null) { - FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - long followerNextIndex = followerLogInformation.getNextIndex(); - boolean isFollowerActive = followerLogInformation.isFollowerActive(); - - FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); - if (followerToSnapshot != null) { - // if install snapshot is in process , then sent next chunk if possible - if (isFollowerActive && followerToSnapshot.canSendNextChunk()) { - sendSnapshotChunk(followerActor, followerId); - } else { - // we send a heartbeat even if we have not received a reply for the last chunk - sendAppendEntriesToFollower(followerActor, followerNextIndex, - Collections.emptyList(), followerId); - } - - } else { - long leaderLastIndex = context.getReplicatedLog().lastIndex(); - long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); - final List entries; - - LOG.debug("{}: Checking sendAppendEntries for {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", - context.getId(), leaderLastIndex, leaderSnapShotIndex); - - if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) { - LOG.debug("{}: sendAppendEntries: {} is present for {}", context.getId(), - followerNextIndex, followerId); - - // FIXME : Sending one entry at a time - entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); + final FollowerLogInformation followerLogInformation = e.getValue(); + // This checks helps not to send a repeat message to the follower + if(followerLogInformation.timeSinceLastActivity() >= heartbeatInterval) { + sendUpdatesToFollower(followerId, followerLogInformation, true); + } + } + } - } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex >= followerNextIndex ) { - // if the followers next index is not present in the leaders log, and - // if the follower is just not starting and if leader's index is more than followers index - // then snapshot should be sent + /** + * + * This method checks if any update needs to be sent to the given follower. This includes append log entries, + * sending next snapshot chunk, and initiating a snapshot. + * @return true if any update is sent, false otherwise + */ - if(LOG.isDebugEnabled()) { - LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," + - "follower-nextIndex: %s, leader-snapshot-index: %s, " + - "leader-last-index: %s", context.getId(), followerId, - followerNextIndex, leaderSnapShotIndex, leaderLastIndex)); - } - actor().tell(new InitiateInstallSnapshot(), actor()); + private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation, + boolean sendHeartbeat) { + + ActorSelection followerActor = context.getPeerActorSelection(followerId); + if (followerActor != null) { + long followerNextIndex = followerLogInformation.getNextIndex(); + boolean isFollowerActive = followerLogInformation.isFollowerActive(); + + if (mapFollowerToSnapshot.get(followerId) != null) { + // if install snapshot is in process , then sent next chunk if possible + if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { + sendSnapshotChunk(followerActor, followerId); + } else if(sendHeartbeat) { + // we send a heartbeat even if we have not received a reply for the last chunk + sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), + Collections.emptyList(), followerId); + } + } else { + long leaderLastIndex = context.getReplicatedLog().lastIndex(); + long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); + if (isFollowerActive && + context.getReplicatedLog().isPresent(followerNextIndex)) { + // FIXME : Sending one entry at a time + final List entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); - // we would want to sent AE as the capture snapshot might take time - entries = Collections.emptyList(); + sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId); - } else { - //we send an AppendEntries, even if the follower is inactive - // in-order to update the followers timestamp, in case it becomes active again - entries = Collections.emptyList(); + } else if (isFollowerActive && followerNextIndex >= 0 && + leaderLastIndex >= followerNextIndex) { + // if the followers next index is not present in the leaders log, and + // if the follower is just not starting and if leader's index is more than followers index + // then snapshot should be sent + + if (LOG.isDebugEnabled()) { + LOG.debug("InitiateInstallSnapshot to follower:{}," + + "follower-nextIndex:{}, leader-snapshot-index:{}, " + + "leader-last-index:{}", followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex + ); } + actor().tell(new InitiateInstallSnapshot(), actor()); - sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId); + // Send heartbeat to follower whenever install snapshot is initiated. + sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), + Collections.emptyList(), followerId); + + } else if(sendHeartbeat) { + //we send an AppendEntries, even if the follower is inactive + // in-order to update the followers timestamp, in case it becomes active again + sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), + Collections.emptyList(), followerId); } + } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index b31cb621b3..63f94828eb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -86,6 +86,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { actorContext.setPeerAddresses(peerAddresses); Leader leader = new Leader(actorContext); + leader.markFollowerActive(followerActor.path().toString()); + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); leader.handleMessage(senderActor, new SendHeartBeat()); final String out = @@ -133,6 +136,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { actorContext.setPeerAddresses(peerAddresses); Leader leader = new Leader(actorContext); + leader.markFollowerActive(followerActor.path().toString()); + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); RaftActorBehavior raftBehavior = leader .handleMessage(senderActor, new Replicate(null, null, new MockRaftActorContext.MockReplicatedLogEntry(1, @@ -270,6 +276,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.getFollowerToSnapshot().getNextChunk(); leader.getFollowerToSnapshot().incrementChunkIndex(); + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching( @@ -344,6 +353,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { //update follower timestamp leader.markFollowerActive(followerActor.path().toString()); + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); + // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex RaftActorBehavior raftBehavior = leader.handleMessage( senderActor, new Replicate(null, "state-id", entry)); @@ -578,7 +590,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception { + public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{ new JavaTestKit(getSystem()) {{ TestActorRef followerActor = @@ -632,11 +644,15 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertEquals(3, installSnapshot.getTotalChunks()); - leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false)); + leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), + followerActor.path().toString(), -1, false)); + + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); leader.handleMessage(leaderActor, new SendHeartBeat()); - o = MessageCollectorActor.getAllMessages(followerActor).get(1); + o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1); assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot); @@ -655,7 +671,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { { TestActorRef followerActor = - TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower"); + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk"); Map peerAddresses = new HashMap<>(); peerAddresses.put(followerActor.path().toString(), @@ -709,10 +725,10 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true )); - leader.handleMessage(leaderActor, new SendHeartBeat()); - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); + o = MessageCollectorActor.getAllMessages(followerActor).get(1); assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot); @@ -874,6 +890,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { Leader leader = new Leader(leaderActorContext); leader.markFollowerActive(followerActor.path().toString()); + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); AppendEntries appendEntries = (AppendEntries) MessageCollectorActor @@ -942,6 +961,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { Leader leader = new Leader(leaderActorContext); leader.markFollowerActive(followerActor.path().toString()); + Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis()); + leader.handleMessage(leaderActor, new SendHeartBeat()); AppendEntries appendEntries = (AppendEntries) MessageCollectorActor @@ -1170,6 +1191,85 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { }}; } + + @Test + public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception { + new JavaTestKit(getSystem()) {{ + + ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + MockRaftActorContext leaderActorContext = + new MockRaftActorContext("leader", getSystem(), leaderActor); + + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS)); + configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); + + leaderActorContext.setConfigParams(configParams); + + ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class)); + + MockRaftActorContext followerActorContext = + new MockRaftActorContext("follower-reply", getSystem(), followerActor); + + followerActorContext.setConfigParams(configParams); + + Follower follower = new Follower(followerActorContext); + + ForwardMessageToBehaviorActor.setBehavior(follower); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put("follower-reply", + followerActor.path().toString()); + + leaderActorContext.setPeerAddresses(peerAddresses); + + leaderActorContext.getReplicatedLog().removeFrom(0); + + //create 3 entries + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + + leaderActorContext.setCommitIndex(1); + + Leader leader = new Leader(leaderActorContext); + leader.markFollowerActive("follower-reply"); + + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, new SendHeartBeat()); + + AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor + .getFirstMatching(followerActor, AppendEntries.class); + + assertNotNull(appendEntries); + + assertEquals(1, appendEntries.getLeaderCommit()); + assertEquals(1, appendEntries.getEntries().get(0).getIndex()); + assertEquals(0, appendEntries.getPrevLogIndex()); + + AppendEntriesReply appendEntriesReply = + (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class); + + assertNotNull(appendEntriesReply); + + leader.handleAppendEntriesReply(followerActor, appendEntriesReply); + + List entries = ForwardMessageToBehaviorActor + .getAllMatching(followerActor, AppendEntries.class); + + assertEquals("AppendEntries count should be 2 ", 2, entries.size()); + + AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1); + + assertEquals(1, appendEntriesSecond.getLeaderCommit()); + assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex()); + assertEquals(1, appendEntriesSecond.getPrevLogIndex()); + + }}; + } + class MockLeader extends Leader { FollowerToSnapshot fts; -- 2.36.6