X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollower.java;h=618865cb88eb8877cdcfdcfb29208c80707c2c0f;hp=c799441d603597ea25d530db8dc6eddc9ced68b6;hb=6eb0469b729fbe69dad58ce6223fc231669089c7;hpb=471ac7d59e93db65fd49733d5185bc1996f7ad43 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index c799441d60..618865cb88 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; @@ -36,12 +37,18 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; */ public class Follower extends AbstractRaftActorBehavior { + + private SnapshotTracker snapshotTracker = null; + private final InitialSyncStatusTracker initialSyncStatusTracker; + public Follower(RaftActorContext context) { super(context, RaftState.Follower); scheduleElection(electionDuration()); + + initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor()); } private boolean isLogEntryPresent(long index){ @@ -71,6 +78,10 @@ public class Follower extends AbstractRaftActorBehavior { return -1; } + private void updateInitialSyncStatus(long currentLeaderCommit, String leaderId){ + initialSyncStatusTracker.update(leaderId, currentLeaderCommit, context.getCommitIndex()); + } + @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { @@ -97,6 +108,7 @@ public class Follower extends AbstractRaftActorBehavior { long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex()); boolean prevEntryPresent = isLogEntryPresent(appendEntries.getPrevLogIndex()); + updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId()); boolean outOfSync = true; @@ -280,7 +292,6 @@ public class Follower extends AbstractRaftActorBehavior { private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) { - LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}", logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(), installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()); @@ -289,6 +300,8 @@ public class Follower extends AbstractRaftActorBehavior { snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks()); } + updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId()); + try { if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(), installSnapshot.getLastChunkHashCode())){ @@ -338,4 +351,36 @@ public class Follower extends AbstractRaftActorBehavior { SnapshotTracker getSnapshotTracker(){ return snapshotTracker; } + + private static class InitialSyncStatusTracker { + + private static final long INVALID_LOG_INDEX = -2L; + private long initialLeaderCommit = INVALID_LOG_INDEX; + private boolean initialSyncUpDone = false; + private String syncedLeaderId = null; + private final ActorRef actor; + + public InitialSyncStatusTracker(ActorRef actor) { + this.actor = actor; + } + + public void update(String leaderId, long leaderCommit, long commitIndex){ + + if(!leaderId.equals(syncedLeaderId)){ + initialSyncUpDone = false; + initialLeaderCommit = INVALID_LOG_INDEX; + syncedLeaderId = leaderId; + } + + if(!initialSyncUpDone){ + if(initialLeaderCommit == INVALID_LOG_INDEX){ + actor.tell(new FollowerInitialSyncUpStatus(false), ActorRef.noSender()); + initialLeaderCommit = leaderCommit; + } else if(commitIndex >= initialLeaderCommit){ + actor.tell(new FollowerInitialSyncUpStatus(true), ActorRef.noSender()); + initialSyncUpDone = true; + } + } + } + } }