X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollower.java;h=b512089692eed80da136bc8f7dfc2f5a2e10274c;hp=2cc2c261bb1d77f3699d5f41d233020b5e4e8b65;hb=37f4afe3c6b199d277b619bfc0056c5a96f8c3e1;hpb=5fd4213b5bfaf2db21f1b37139f6b98535a872c0 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 2cc2c261bb..b512089692 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -139,7 +139,7 @@ public class Follower extends AbstractRaftActorBehavior { if (snapshotTracker != null && !snapshotTracker.getLeaderId().equals(appendEntries.getLeaderId())) { log.debug("{}: snapshot install is in progress but the prior snapshot leaderId {} does not match the " + "AppendEntries leaderId {}", logName(), snapshotTracker.getLeaderId(), appendEntries.getLeaderId()); - snapshotTracker = null; + closeSnapshotTracker(); } if (snapshotTracker != null || context.getSnapshotManager().isApplying()) { @@ -165,11 +165,12 @@ public class Follower extends AbstractRaftActorBehavior { // We found that the log was out of sync so just send a negative // reply and return - log.debug("{}: Follower is out-of-sync, so sending negative reply, lastIndex: {}, lastTerm: {}", - logName(), lastIndex, lastTerm()); + final AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, + lastTerm(), context.getPayloadVersion()); - sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, - lastTerm(), context.getPayloadVersion()), actor()); + log.info("{}: Follower is out-of-sync so sending negative reply: {}", logName(), reply); + + sender.tell(reply, actor()); return this; } @@ -206,8 +207,7 @@ public class Follower extends AbstractRaftActorBehavior { if (!context.getRaftPolicy().applyModificationToStateBeforeConsensus()) { - log.debug("{}: Removing entries from log starting at {}", logName(), - matchEntry.getIndex()); + log.info("{}: Removing entries from log starting at {}", logName(), matchEntry.getIndex()); // Entries do not match so remove all subsequent entries if (!context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex())) { @@ -216,7 +216,7 @@ public class Follower extends AbstractRaftActorBehavior { // so we must send back a reply to force a snapshot to completely re-sync the // follower's log and state. - log.debug("{}: Could not remove entries - sending reply to force snapshot", logName()); + log.info("{}: Could not remove entries - sending reply to force snapshot", logName()); sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, lastTerm(), context.getPayloadVersion(), true), actor()); return this; @@ -284,6 +284,18 @@ public class Follower extends AbstractRaftActorBehavior { log.debug("{}: Commit index set to {}", logName(), context.getCommitIndex()); } + AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true, + lastIndex, lastTerm(), context.getPayloadVersion()); + + if (log.isTraceEnabled()) { + log.trace("{}: handleAppendEntries returning : {}", logName(), reply); + } else if (log.isDebugEnabled() && numLogEntries > 0) { + log.debug("{}: handleAppendEntries returning : {}", logName(), reply); + } + + // Reply to the leader before applying any previous state so as not to hold up leader consensus. + sender.tell(reply, actor()); + // If commitIndex > lastApplied: increment lastApplied, apply // log[lastApplied] to state machine (§5.3) // check if there are any entries to be applied. last-applied can be equal to last-index @@ -298,17 +310,6 @@ public class Follower extends AbstractRaftActorBehavior { applyLogToStateMachine(appendEntries.getLeaderCommit()); } - AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true, - lastIndex, lastTerm(), context.getPayloadVersion()); - - if (log.isTraceEnabled()) { - log.trace("{}: handleAppendEntries returning : {}", logName(), reply); - } else if (log.isDebugEnabled() && numLogEntries > 0) { - log.debug("{}: handleAppendEntries returning : {}", logName(), reply); - } - - sender.tell(reply, actor()); - if (!context.getSnapshotManager().isCapturing()) { super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex()); } @@ -330,14 +331,14 @@ public class Follower extends AbstractRaftActorBehavior { // an entry at prevLogIndex and this follower has no entries in // it's log. - log.debug("{}: The followers log is empty and the senders prevLogIndex is {}", + log.info("{}: The followers log is empty and the senders prevLogIndex is {}", logName(), appendEntries.getPrevLogIndex()); } else if (lastIndex > -1 && appendEntries.getPrevLogIndex() != -1 && !prevEntryPresent) { // The follower's log is out of sync because the Leader's // prevLogIndex entry was not found in it's log - log.debug("{}: The log is not empty but the prevLogIndex {} was not found in it - " + log.info("{}: The log is not empty but the prevLogIndex {} was not found in it - " + "lastIndex: {}, snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex, context.getReplicatedLog().getSnapshotIndex()); } else if (lastIndex > -1 && prevEntryPresent && prevLogTerm != appendEntries.getPrevLogTerm()) { @@ -346,7 +347,7 @@ public class Follower extends AbstractRaftActorBehavior { // prevLogIndex entry does exist in the follower's log but it has // a different term in it - log.debug("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append entries" + log.info("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append entries" + "prevLogTerm {} - lastIndex: {}, snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(), prevLogTerm, appendEntries.getPrevLogTerm(), lastIndex, context.getReplicatedLog().getSnapshotIndex()); @@ -356,12 +357,12 @@ public class Follower extends AbstractRaftActorBehavior { // This append entry comes from a leader who has it's log aggressively trimmed and so does not have // the previous entry in it's in-memory journal - log.debug("{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the" + log.info("{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the" + " in-memory journal", logName(), appendEntries.getReplicatedToAllIndex()); } else if (appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1 && appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0 && !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)) { - log.debug("{}: Cannot append entries because the calculated previousIndex {} was not found in the " + log.info("{}: Cannot append entries because the calculated previousIndex {} was not found in the " + " in-memory journal", logName(), appendEntries.getEntries().get(0).getIndex() - 1); } else { outOfSync = false; @@ -397,7 +398,7 @@ public class Follower extends AbstractRaftActorBehavior { // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - log.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term", + log.info("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term", logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); context.getTermInformation().updateAndPersist(rpc.getTerm(), null); @@ -518,7 +519,8 @@ public class Follower extends AbstractRaftActorBehavior { leaderId = installSnapshot.getLeaderId(); if (snapshotTracker == null) { - snapshotTracker = new SnapshotTracker(log, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId()); + snapshotTracker = new SnapshotTracker(log, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId(), + context); } updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId()); @@ -529,6 +531,9 @@ public class Follower extends AbstractRaftActorBehavior { if (snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(), installSnapshot.getLastChunkHashCode())) { + + log.info("{}: Snapshot installed from leader: {}", logName(), installSnapshot.getLeaderId()); + Snapshot snapshot = Snapshot.create( context.getSnapshotManager().convertSnapshot(snapshotTracker.getSnapshotBytes()), new ArrayList<>(),