X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollower.java;h=d16ddabddee988df72c90866a05486312ecc48d9;hp=c799441d603597ea25d530db8dc6eddc9ced68b6;hb=a89f83014714207a4ccb704eb75050d758266d71;hpb=bead4f9288b0c758ac736ee8945ca01313b177da diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index c799441d60..d16ddabdde 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; @@ -36,12 +37,23 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; */ public class Follower extends AbstractRaftActorBehavior { + + private SnapshotTracker snapshotTracker = null; + private final InitialSyncStatusTracker initialSyncStatusTracker; + public Follower(RaftActorContext context) { super(context, RaftState.Follower); - scheduleElection(electionDuration()); + initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor()); + + if(context.getPeerAddresses().isEmpty()){ + actor().tell(ELECTION_TIMEOUT, actor()); + } else { + scheduleElection(electionDuration()); + } + } private boolean isLogEntryPresent(long index){ @@ -71,6 +83,10 @@ public class Follower extends AbstractRaftActorBehavior { return -1; } + private void updateInitialSyncStatus(long currentLeaderCommit, String leaderId){ + initialSyncStatusTracker.update(leaderId, currentLeaderCommit, context.getCommitIndex()); + } + @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { @@ -85,18 +101,34 @@ public class Follower extends AbstractRaftActorBehavior { // to make it easier to read. Before refactoring ensure tests // cover the code properly + if (snapshotTracker != null) { + // if snapshot install is in progress, follower should just acknowledge append entries with a reply. + AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true, + lastIndex(), lastTerm(), context.getPayloadVersion()); + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply); + } + sender.tell(reply, actor()); + + return this; + } + // 1. Reply false if term < currentTerm (§5.1) // This is handled in the appendEntries method of the base class // If we got here then we do appear to be talking to the leader leaderId = appendEntries.getLeaderId(); + setLeaderPayloadVersion(appendEntries.getPayloadVersion()); + // 2. Reply false if log doesn’t contain an entry at prevLogIndex // whose term matches prevLogTerm (§5.3) long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex()); boolean prevEntryPresent = isLogEntryPresent(appendEntries.getPrevLogIndex()); + updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId()); boolean outOfSync = true; @@ -138,7 +170,7 @@ public class Follower extends AbstractRaftActorBehavior { logName(), lastIndex, lastTerm()); sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, - lastTerm()), actor()); + lastTerm(), context.getPayloadVersion()), actor()); return this; } @@ -220,7 +252,7 @@ public class Follower extends AbstractRaftActorBehavior { } AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true, - lastIndex, lastTerm()); + lastIndex, lastTerm(), context.getPayloadVersion()); if(LOG.isTraceEnabled()) { LOG.trace("{}: handleAppendEntries returning : {}", logName(), reply); @@ -230,7 +262,7 @@ public class Follower extends AbstractRaftActorBehavior { sender.tell(reply, actor()); - if (!context.isSnapshotCaptureInitiated()) { + if (!context.getSnapshotManager().isCapturing()) { super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex()); } @@ -280,7 +312,6 @@ public class Follower extends AbstractRaftActorBehavior { private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) { - LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}", logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(), installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()); @@ -289,6 +320,8 @@ public class Follower extends AbstractRaftActorBehavior { snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks()); } + updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId()); + try { if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(), installSnapshot.getLastChunkHashCode())){ @@ -338,4 +371,36 @@ public class Follower extends AbstractRaftActorBehavior { SnapshotTracker getSnapshotTracker(){ return snapshotTracker; } + + private class InitialSyncStatusTracker { + + private static final long INVALID_LOG_INDEX = -2L; + private long initialLeaderCommit = INVALID_LOG_INDEX; + private boolean initialSyncUpDone = false; + private String syncedLeaderId = null; + private final ActorRef actor; + + public InitialSyncStatusTracker(ActorRef actor) { + this.actor = actor; + } + + public void update(String leaderId, long leaderCommit, long commitIndex){ + + if(!leaderId.equals(syncedLeaderId)){ + initialSyncUpDone = false; + initialLeaderCommit = INVALID_LOG_INDEX; + syncedLeaderId = leaderId; + } + + if(!initialSyncUpDone){ + if(initialLeaderCommit == INVALID_LOG_INDEX){ + actor.tell(new FollowerInitialSyncUpStatus(false, getId()), ActorRef.noSender()); + initialLeaderCommit = leaderCommit; + } else if(commitIndex >= initialLeaderCommit){ + actor.tell(new FollowerInitialSyncUpStatus(true, getId()), ActorRef.noSender()); + initialSyncUpDone = true; + } + } + } + } }