Add more debug output in AbstractLeader and Follower
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index 497d98cdce13cded64be94696591ffdb171f22d3..0d15e756354a9cb541e24f6b5e8e242fc5fee0e8 100644 (file)
@@ -140,6 +140,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     public void removeFollower(String followerId) {
         followerToLog.remove(followerId);
+        mapFollowerToSnapshot.remove(followerId);
     }
 
     public void updateMinReplicaCount() {
@@ -250,23 +251,51 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         // If there exists an N such that N > commitIndex, a majority
         // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
         // set commitIndex = N (§5.3, §5.4).
+        if(LOG.isTraceEnabled()) {
+            LOG.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
+                    logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
+        }
+
         for (long N = context.getCommitIndex() + 1; ; N++) {
             int replicatedCount = 1;
 
+            LOG.trace("{}: checking Nth index {}", logName(), N);
             for (FollowerLogInformation info : followerToLog.values()) {
-                if ((info.getMatchIndex() >= N) && (context.getPeerInfo(followerId).isVoting())) {
+                final PeerInfo peerInfo = context.getPeerInfo(info.getId());
+                if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) {
                     replicatedCount++;
+                } else if(LOG.isDebugEnabled()) {
+                    LOG.debug("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
+                            info.getMatchIndex(), peerInfo);
                 }
             }
 
+            if(LOG.isTraceEnabled()) {
+                LOG.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount, minReplicationCount);
+            }
+
             if (replicatedCount >= minReplicationCount) {
                 ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
-                if (replicatedLogEntry != null && replicatedLogEntry.getTerm() == currentTerm()) {
+                if (replicatedLogEntry == null) {
+                    LOG.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
+                            logName(), N, context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().size());
+                    break;
+                }
+
+                // Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
+                // "Raft never commits log entries from previous terms by counting replicas".
+                // However we keep looping so we can make progress when new entries in the current term
+                // reach consensus, as per §5.4.1: "once an entry from the current term is committed by
+                // counting replicas, then all prior entries are committed indirectly".
+                if (replicatedLogEntry.getTerm() == currentTerm()) {
+                    LOG.trace("{}: Setting commit index to {}", logName(), N);
                     context.setCommitIndex(N);
                 } else {
-                    break;
+                    LOG.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, term {} does not match the current term {}",
+                            logName(), N, replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm());
                 }
             } else {
+                LOG.trace("{}: minReplicationCount not reached - breaking", logName());
                 break;
             }
         }
@@ -509,7 +538,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         }
     }
 
-    private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
+    protected void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
         // Send an AppendEntries to all followers
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             final String followerId = e.getKey();