X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=8f33d94700bc4a87231c44ab66758bd7c28bf049;hp=3336cbd9bec1bbc9feaac98610dd8bea85a85a6f;hb=e3918142bf119759d092af5c7fa72807b3a467e0;hpb=1a6cfbeec045f1f12ddf82b39345f00b3eeb69f8 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 3336cbd9be..8f33d94700 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -232,6 +232,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { purgeInMemoryLog(); } + //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event + sendUpdatesToFollower(followerId, followerLogInformation, false); return this; } @@ -344,6 +346,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInformation.markFollowerActive(); if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { + boolean wasLastChunk = false; if (reply.isSuccess()) { if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply @@ -371,6 +374,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // we can remove snapshot from the memory setSnapshot(Optional.absent()); } + wasLastChunk = true; } else { followerToSnapshot.markSendStatus(true); @@ -381,6 +385,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToSnapshot.markSendStatus(false); } + + if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) { + ActorSelection followerActor = context.getPeerActorSelection(followerId); + if(followerActor != null) { + sendSnapshotChunk(followerActor, followerId); + } + } + } else { LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", context.getId(), reply.getChunkIndex(), followerId, @@ -419,67 +431,77 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendAppendEntries() { // Send an AppendEntries to all followers - + long heartbeatInterval = context.getConfigParams().getHeartBeatInterval().toMillis(); for (Entry e : followerToLog.entrySet()) { final String followerId = e.getKey(); - ActorSelection followerActor = context.getPeerActorSelection(followerId); + final FollowerLogInformation followerLogInformation = e.getValue(); + // This checks helps not to send a repeat message to the follower + if(followerLogInformation.timeSinceLastActivity() >= heartbeatInterval) { + sendUpdatesToFollower(followerId, followerLogInformation, true); + } + } + } - if (followerActor != null) { - FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - long followerNextIndex = followerLogInformation.getNextIndex(); - boolean isFollowerActive = followerLogInformation.isFollowerActive(); + /** + * + * This method checks if any update needs to be sent to the given follower. This includes append log entries, + * sending next snapshot chunk, and initiating a snapshot. + * @return true if any update is sent, false otherwise + */ - FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); - if (followerToSnapshot != null) { - // if install snapshot is in process , then sent next chunk if possible - if (isFollowerActive && followerToSnapshot.canSendNextChunk()) { - sendSnapshotChunk(followerActor, followerId); - } else { - // we send a heartbeat even if we have not received a reply for the last chunk - sendAppendEntriesToFollower(followerActor, followerNextIndex, - Collections.emptyList(), followerId); - } + private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation, + boolean sendHeartbeat) { + + ActorSelection followerActor = context.getPeerActorSelection(followerId); + if (followerActor != null) { + long followerNextIndex = followerLogInformation.getNextIndex(); + boolean isFollowerActive = followerLogInformation.isFollowerActive(); + + if (mapFollowerToSnapshot.get(followerId) != null) { + // if install snapshot is in process , then sent next chunk if possible + if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { + sendSnapshotChunk(followerActor, followerId); + } else if(sendHeartbeat) { + // we send a heartbeat even if we have not received a reply for the last chunk + sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), + Collections.emptyList(), followerId); + } + } else { + long leaderLastIndex = context.getReplicatedLog().lastIndex(); + long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); + if (isFollowerActive && + context.getReplicatedLog().isPresent(followerNextIndex)) { + // FIXME : Sending one entry at a time + final List entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); - } else { - long leaderLastIndex = context.getReplicatedLog().lastIndex(); - long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); - final List entries; - - LOG.debug("{}: Checking sendAppendEntries for {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", - context.getId(), leaderLastIndex, leaderSnapShotIndex); - - if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) { - LOG.debug("{}: sendAppendEntries: {} is present for {}", context.getId(), - followerNextIndex, followerId); - - // FIXME : Sending one entry at a time - entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); - - } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex >= followerNextIndex ) { - // if the followers next index is not present in the leaders log, and - // if the follower is just not starting and if leader's index is more than followers index - // then snapshot should be sent - - if(LOG.isDebugEnabled()) { - LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," + - "follower-nextIndex: %s, leader-snapshot-index: %s, " + - "leader-last-index: %s", context.getId(), followerId, - followerNextIndex, leaderSnapShotIndex, leaderLastIndex)); - } - actor().tell(new InitiateInstallSnapshot(), actor()); - - // we would want to sent AE as the capture snapshot might take time - entries = Collections.emptyList(); - - } else { - //we send an AppendEntries, even if the follower is inactive - // in-order to update the followers timestamp, in case it becomes active again - entries = Collections.emptyList(); + sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId); + + } else if (isFollowerActive && followerNextIndex >= 0 && + leaderLastIndex >= followerNextIndex) { + // if the followers next index is not present in the leaders log, and + // if the follower is just not starting and if leader's index is more than followers index + // then snapshot should be sent + + if (LOG.isDebugEnabled()) { + LOG.debug("InitiateInstallSnapshot to follower:{}," + + "follower-nextIndex:{}, leader-snapshot-index:{}, " + + "leader-last-index:{}", followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex + ); } + actor().tell(new InitiateInstallSnapshot(), actor()); - sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId); + // Send heartbeat to follower whenever install snapshot is initiated. + sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), + Collections.emptyList(), followerId); + + } else if(sendHeartbeat) { + //we send an AppendEntries, even if the follower is inactive + // in-order to update the followers timestamp, in case it becomes active again + sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), + Collections.emptyList(), followerId); } + } } }