X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=bdfdd9b3765c576495e5bbf96dcdb19958c73cd5;hp=890d45e8fb664b20a4f27e8996d38b15e27a0120;hb=bddd1b6405fe8e7616fdc93838249b504da098c6;hpb=7f44a5812395762479de68d339c3d017db6c0e9d diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 890d45e8fb..bdfdd9b376 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -33,7 +33,6 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -134,7 +133,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * * @return Collection of follower IDs */ - protected final Collection getFollowerIds() { + public final Collection getFollowerIds() { return followerToLog.keySet(); } @@ -158,8 +157,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if(LOG.isTraceEnabled()) { LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply); - } else if(LOG.isDebugEnabled() && !appendEntriesReply.isSuccess()) { - LOG.debug("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply); } // Update the FollowerLogInformation @@ -174,7 +171,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if(followerLogInformation.timeSinceLastActivity() > context.getConfigParams().getElectionTimeOutInterval().toMillis()) { - LOG.error("{} : handleAppendEntriesReply delayed beyond election timeout, " + + LOG.warn("{} : handleAppendEntriesReply delayed beyond election timeout, " + "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}", logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(), context.getLastApplied(), context.getCommitIndex()); @@ -182,12 +179,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInformation.markFollowerActive(); + boolean updated = false; if (appendEntriesReply.isSuccess()) { - followerLogInformation - .setMatchIndex(appendEntriesReply.getLogLastIndex()); - followerLogInformation - .setNextIndex(appendEntriesReply.getLogLastIndex() + 1); + updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex()); + updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated; + + if(updated && LOG.isDebugEnabled()) { + LOG.debug("{}: handleAppendEntriesReply - FollowerLogInformation for {} updated: matchIndex: {}, nextIndex: {}", logName(), + followerId, followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex()); + } } else { + LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply); // TODO: When we find that the follower is out of sync with the // Leader we simply decrement that followers next index by 1. @@ -224,18 +226,20 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Apply the change to the state machine if (context.getCommitIndex() > context.getLastApplied()) { - LOG.debug("{}: handleAppendEntriesReply: applying to log - commitIndex: {}, lastAppliedIndex: {}", - logName(), context.getCommitIndex(), context.getLastApplied()); + if(LOG.isDebugEnabled()) { + LOG.debug("{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}", + logName(), followerId, context.getCommitIndex(), context.getLastApplied()); + } applyLogToStateMachine(context.getCommitIndex()); } - if (!context.isSnapshotCaptureInitiated()) { + if (!context.getSnapshotManager().isCapturing()) { purgeInMemoryLog(); } //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event - sendUpdatesToFollower(followerId, followerLogInformation, false, false); + sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); return this; } @@ -383,7 +387,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToSnapshot.markSendStatus(false); } - if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) { + if (wasLastChunk && !context.getSnapshotManager().isCapturing()) { + // Since the follower is now caught up try to purge the log. + purgeInMemoryLog(); + } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) { ActorSelection followerActor = context.getPeerActorSelection(followerId); if(followerActor != null) { sendSnapshotChunk(followerActor, followerId); @@ -452,6 +459,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (followerActor != null) { long followerNextIndex = followerLogInformation.getNextIndex(); boolean isFollowerActive = followerLogInformation.isFollowerActive(); + boolean sendAppendEntries = false; + List entries = Collections.emptyList(); if (mapFollowerToSnapshot.get(followerId) != null) { // if install snapshot is in process , then sent next chunk if possible @@ -459,16 +468,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { sendSnapshotChunk(followerActor, followerId); } else if(sendHeartbeat) { // we send a heartbeat even if we have not received a reply for the last chunk - sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), - Collections.emptyList(), followerId); + sendAppendEntries = true; } } else { long leaderLastIndex = context.getReplicatedLog().lastIndex(); long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); - if(!isHeartbeat || LOG.isTraceEnabled()) { - LOG.debug("{}: Checking sendAppendEntries for follower {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", - logName(), followerId, leaderLastIndex, leaderSnapShotIndex); + if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) { + LOG.debug("{}: Checking sendAppendEntries for follower {}, followerNextIndex {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", + logName(), followerId, followerNextIndex, leaderLastIndex, leaderSnapShotIndex); } if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) { @@ -477,12 +485,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerNextIndex, followerId); // FIXME : Sending one entry at a time - final List entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); - - sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId); - + if(followerLogInformation.okToReplicate()) { + entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); + sendAppendEntries = true; + } } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) { + leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) { // if the followers next index is not present in the leaders log, and // if the follower is just not starting and if leader's index is more than followers index // then snapshot should be sent @@ -495,19 +503,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } // Send heartbeat to follower whenever install snapshot is initiated. - sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), - Collections.emptyList(), followerId); - + sendAppendEntries = true; initiateCaptureSnapshot(followerId, followerNextIndex); } else if(sendHeartbeat) { - //we send an AppendEntries, even if the follower is inactive + // we send an AppendEntries, even if the follower is inactive // in-order to update the followers timestamp, in case it becomes active again - sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), - Collections.emptyList(), followerId); + sendAppendEntries = true; } } + + if(sendAppendEntries) { + sendAppendEntriesToFollower(followerActor, followerNextIndex, + entries, followerId); + } } } @@ -551,29 +561,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { final ActorSelection followerActor = context.getPeerActorSelection(followerId); sendSnapshotChunk(followerActor, followerId); - } else if (!context.isSnapshotCaptureInitiated()) { - - LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", logName(), getLeaderId()); - ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); - long lastAppliedIndex = -1; - long lastAppliedTerm = -1; - - if (lastAppliedEntry != null) { - lastAppliedIndex = lastAppliedEntry.getIndex(); - lastAppliedTerm = lastAppliedEntry.getTerm(); - } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { - lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex(); - lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm(); - } - boolean isInstallSnapshotInitiated = true; - long replicatedToAllIndex = super.getReplicatedToAllIndex(); - ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); - actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, - (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), - (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1), - isInstallSnapshotInitiated), actor()); - context.setSnapshotCaptureInitiated(true); + } else { + context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + this.getReplicatedToAllIndex(), followerId); } } } @@ -619,10 +610,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { ).toSerializable(), actor() ); - LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", - logName(), followerActor.path(), - followerToSnapshot.getChunkIndex(), - followerToSnapshot.getTotalChunks()); + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", + logName(), followerActor.path(), followerToSnapshot.getChunkIndex(), + followerToSnapshot.getTotalChunks()); + } } } catch (IOException e) { LOG.error("{}: InstallSnapshot failed for Leader.", logName(), e);