X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=bdfdd9b3765c576495e5bbf96dcdb19958c73cd5;hp=a4753a4fe432654554a64578af0b09233069146b;hb=bddd1b6405fe8e7616fdc93838249b504da098c6;hpb=576efc4bd225c62269108466aaaa2c4a2dfd4d65 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index a4753a4fe4..bdfdd9b376 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -33,7 +33,6 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -134,7 +133,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * * @return Collection of follower IDs */ - protected final Collection getFollowerIds() { + public final Collection getFollowerIds() { return followerToLog.keySet(); } @@ -235,7 +234,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { applyLogToStateMachine(context.getCommitIndex()); } - if (!context.isSnapshotCaptureInitiated()) { + if (!context.getSnapshotManager().isCapturing()) { purgeInMemoryLog(); } @@ -388,7 +387,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToSnapshot.markSendStatus(false); } - if (wasLastChunk && !context.isSnapshotCaptureInitiated()) { + if (wasLastChunk && !context.getSnapshotManager().isCapturing()) { // Since the follower is now caught up try to purge the log. purgeInMemoryLog(); } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) { @@ -460,6 +459,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (followerActor != null) { long followerNextIndex = followerLogInformation.getNextIndex(); boolean isFollowerActive = followerLogInformation.isFollowerActive(); + boolean sendAppendEntries = false; + List entries = Collections.emptyList(); if (mapFollowerToSnapshot.get(followerId) != null) { // if install snapshot is in process , then sent next chunk if possible @@ -467,8 +468,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { sendSnapshotChunk(followerActor, followerId); } else if(sendHeartbeat) { // we send a heartbeat even if we have not received a reply for the last chunk - sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), - Collections.emptyList(), followerId); + sendAppendEntries = true; } } else { long leaderLastIndex = context.getReplicatedLog().lastIndex(); @@ -485,12 +485,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerNextIndex, followerId); // FIXME : Sending one entry at a time - final List entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); - - sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId); - + if(followerLogInformation.okToReplicate()) { + entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); + sendAppendEntries = true; + } } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) { + leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) { // if the followers next index is not present in the leaders log, and // if the follower is just not starting and if leader's index is more than followers index // then snapshot should be sent @@ -503,19 +503,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } // Send heartbeat to follower whenever install snapshot is initiated. - sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), - Collections.emptyList(), followerId); - + sendAppendEntries = true; initiateCaptureSnapshot(followerId, followerNextIndex); } else if(sendHeartbeat) { - //we send an AppendEntries, even if the follower is inactive + // we send an AppendEntries, even if the follower is inactive // in-order to update the followers timestamp, in case it becomes active again - sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), - Collections.emptyList(), followerId); + sendAppendEntries = true; } } + + if(sendAppendEntries) { + sendAppendEntriesToFollower(followerActor, followerNextIndex, + entries, followerId); + } } } @@ -559,37 +561,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { final ActorSelection followerActor = context.getPeerActorSelection(followerId); sendSnapshotChunk(followerActor, followerId); - } else if (!context.isSnapshotCaptureInitiated()) { - ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); - long lastAppliedIndex = -1; - long lastAppliedTerm = -1; - - if (lastAppliedEntry != null) { - lastAppliedIndex = lastAppliedEntry.getIndex(); - lastAppliedTerm = lastAppliedEntry.getTerm(); - } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { - lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex(); - lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm(); - } - - boolean isInstallSnapshotInitiated = true; - long replicatedToAllIndex = super.getReplicatedToAllIndex(); - ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); - - CaptureSnapshot captureSnapshot = new CaptureSnapshot( - lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, - (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), - (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1), - isInstallSnapshotInitiated); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Initiating install snapshot to follower {}: {}", logName(), followerId, - captureSnapshot); - } - - actor().tell(captureSnapshot, actor()); - context.setSnapshotCaptureInitiated(true); + } else { + context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + this.getReplicatedToAllIndex(), followerId); } } }