Send leader's full address via AppendEntries
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index dc9cb23facac0aaab31b463105db136de7ea31a2..2175eb75557d743cbe09020f9408035f69e31c75 100644 (file)
@@ -34,7 +34,6 @@ import org.opendaylight.controller.cluster.messaging.SliceOptions;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.PeerInfo;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -116,7 +115,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             trackers.addAll(initializeFromLeader.trackers);
         } else {
             for (PeerInfo peerInfo: context.getPeers()) {
-                FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
+                FollowerLogInformation followerLogInformation = new FollowerLogInformation(peerInfo, context);
                 followerToLog.put(peerInfo.getId(), followerLogInformation);
             }
         }
@@ -149,8 +148,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     public void addFollower(final String followerId) {
-        FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(
-                context.getPeerInfo(followerId), -1, context);
+        FollowerLogInformation followerLogInformation = new FollowerLogInformation(context.getPeerInfo(followerId),
+            context);
         followerToLog.put(followerId, followerLogInformation);
 
         if (heartbeatSchedule == null) {
@@ -217,20 +216,20 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             return this;
         }
 
-        if (followerLogInformation.timeSinceLastActivity()
-                > context.getConfigParams().getElectionTimeOutInterval().toMillis()) {
+        final long lastActivityNanos = followerLogInformation.nanosSinceLastActivity();
+        if (lastActivityNanos > context.getConfigParams().getElectionTimeOutInterval().toNanos()) {
             log.warn("{} : handleAppendEntriesReply delayed beyond election timeout, "
                     + "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}",
-                    logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(),
+                    logName(), appendEntriesReply, TimeUnit.NANOSECONDS.toMillis(lastActivityNanos),
                     context.getLastApplied(), context.getCommitIndex());
         }
 
         followerLogInformation.markFollowerActive();
         followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
         followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
+        followerLogInformation.setNeedsLeaderAddress(appendEntriesReply.isNeedsLeaderAddress());
 
         long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
-        long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex);
         boolean updated = false;
         if (appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) {
             // The follower's log is actually ahead of the leader's log. Normally this doesn't happen
@@ -247,9 +246,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // However in this case the log terms won't match and the logs will conflict - this is handled
             // elsewhere.
             log.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} "
-                    + "(snapshotIndex {}) - forcing install snaphot", logName(), followerLogInformation.getId(),
-                    appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex(),
-                    context.getReplicatedLog().getSnapshotIndex());
+                    + "(snapshotIndex {}, snapshotTerm {}) - forcing install snaphot", logName(),
+                    followerLogInformation.getId(), appendEntriesReply.getLogLastIndex(),
+                    context.getReplicatedLog().lastIndex(), context.getReplicatedLog().getSnapshotIndex(),
+                    context.getReplicatedLog().getSnapshotTerm());
 
             followerLogInformation.setMatchIndex(-1);
             followerLogInformation.setNextIndex(-1);
@@ -258,6 +258,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             updated = true;
         } else if (appendEntriesReply.isSuccess()) {
+            long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex);
             if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0
                     && followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) {
                 // The follower's last entry is present in the leader's journal but the terms don't match so the
@@ -279,9 +280,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
             }
         } else {
-            log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}",
-                    logName(), appendEntriesReply, context.getReplicatedLog().getSnapshotIndex());
+            log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}, "
+                    + "snapshotTerm: {}, replicatedToAllIndex: {}", logName(), appendEntriesReply,
+                    context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(),
+                    getReplicatedToAllIndex());
 
+            long followersLastLogTermInLeadersLogOrSnapshot = getLogEntryOrSnapshotTerm(followerLastLogIndex);
             if (appendEntriesReply.isForceInstallSnapshot()) {
                 // Reset the followers match and next index. This is to signal that this follower has nothing
                 // in common with this Leader and so would require a snapshot to be installed
@@ -290,12 +294,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 // Force initiate a snapshot capture
                 initiateCaptureSnapshot(followerId);
-            } else if (followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0
-                    && followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) {
-                // The follower's log is empty or the last entry is present in the leader's journal
-                // and the terms match so the follower is just behind the leader's journal from
-                // the last snapshot, if any. We'll catch up the follower quickly by starting at the
-                // follower's last log index.
+            } else if (followerLastLogIndex < 0 || followersLastLogTermInLeadersLogOrSnapshot >= 0
+                    && followersLastLogTermInLeadersLogOrSnapshot == appendEntriesReply.getLogLastTerm()) {
+                // The follower's log is empty or the follower's last entry is present in the leader's journal or
+                // snapshot and the terms match so the follower is just behind the leader's journal from the last
+                // snapshot, if any. We'll catch up the follower quickly by starting at the follower's last log index.
 
                 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
 
@@ -311,7 +314,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                     log.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}",
                             logName(), followerId, appendEntriesReply.getLogLastTerm(),
-                            followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
+                            followersLastLogTermInLeadersLogOrSnapshot, followerLogInformation.getNextIndex());
                 }
             }
         }
@@ -630,14 +633,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         }
     }
 
-    protected void sendAppendEntries(final long timeSinceLastActivityInterval, final boolean isHeartbeat) {
+    protected void sendAppendEntries(final long timeSinceLastActivityIntervalNanos, final boolean isHeartbeat) {
         // Send an AppendEntries to all followers
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             final String followerId = e.getKey();
             final FollowerLogInformation followerLogInformation = e.getValue();
             // This checks helps not to send a repeat message to the follower
             if (!followerLogInformation.isFollowerActive()
-                    || followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) {
+                    || followerLogInformation.nanosSinceLastActivity() >= timeSinceLastActivityIntervalNanos) {
                 sendUpdatesToFollower(followerId, followerLogInformation, true, isHeartbeat);
             }
         }
@@ -814,7 +817,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
             getLogEntryIndex(followerNextIndex - 1),
             getLogEntryTerm(followerNextIndex - 1), entries,
-            leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion());
+            leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion(),
+            followerLogInformation.getRaftVersion(), followerLogInformation.needsLeaderAddress(getId()));
 
         if (!entries.isEmpty() || log.isTraceEnabled()) {
             log.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(),
@@ -950,7 +954,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void sendHeartBeat() {
         if (!followerToLog.isEmpty()) {
             log.trace("{}: Sending heartbeat", logName());
-            sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true);
+            sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toNanos(), true);
 
             appendEntriesMessageSlicer.checkExpiredSlicedMessageState();
         }