Add voting state to shard mbean FollowerInfo
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index d914154f8bb5565c64836f7f97282a7390aa37e4..a1ec4d883135f42d88b9d08db82ba49acb1816ba 100644 (file)
@@ -25,6 +25,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Queue;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
@@ -84,13 +85,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
     private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
-    private Cancellable heartbeatSchedule = null;
-
-    private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
-
-    private int minReplicationCount;
+    /**
+     * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
+     * expect the entries to be modified in sequence, hence we open-code the lookup.
+     *
+     * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly,
+     *       but we already expect those to be far from frequent.
+     */
+    private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
 
+    private Cancellable heartbeatSchedule = null;
     private Optional<SnapshotHolder> snapshot;
+    private int minReplicationCount;
 
     protected AbstractLeader(RaftActorContext context, RaftState state) {
         super(context, state);
@@ -249,19 +255,34 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         // If there exists an N such that N > commitIndex, a majority
         // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
         // set commitIndex = N (§5.3, §5.4).
+        if(LOG.isTraceEnabled()) {
+            LOG.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
+                    logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
+        }
+
         for (long N = context.getCommitIndex() + 1; ; N++) {
             int replicatedCount = 1;
 
+            LOG.trace("{}: checking Nth index {}", logName(), N);
             for (FollowerLogInformation info : followerToLog.values()) {
                 final PeerInfo peerInfo = context.getPeerInfo(info.getId());
                 if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) {
                     replicatedCount++;
+                } else if(LOG.isTraceEnabled()) {
+                    LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
+                            info.getMatchIndex(), peerInfo);
                 }
             }
 
+            if(LOG.isTraceEnabled()) {
+                LOG.trace("{}: replicatedCount {}, minReplicationCount: {}", logName(), replicatedCount, minReplicationCount);
+            }
+
             if (replicatedCount >= minReplicationCount) {
                 ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
                 if (replicatedLogEntry == null) {
+                    LOG.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
+                            logName(), N, context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().size());
                     break;
                 }
 
@@ -271,9 +292,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 // reach consensus, as per §5.4.1: "once an entry from the current term is committed by
                 // counting replicas, then all prior entries are committed indirectly".
                 if (replicatedLogEntry.getTerm() == currentTerm()) {
+                    LOG.trace("{}: Setting commit index to {}", logName(), N);
                     context.setCommitIndex(N);
+                } else {
+                    LOG.debug("{}: Not updating commit index to {} - retrieved log entry with index {}, term {} does not match the current term {}",
+                            logName(), N, replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm());
                 }
             } else {
+                LOG.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount);
                 break;
             }
         }
@@ -325,7 +351,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     @Override
     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
-        final Iterator<ClientRequestTracker> it = trackerList.iterator();
+        final Iterator<ClientRequestTracker> it = trackers.iterator();
         while (it.hasNext()) {
             final ClientRequestTracker t = it.next();
             if (t.getIndex() == logIndex) {
@@ -337,16 +363,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return null;
     }
 
-    @Override
-    protected ClientRequestTracker findClientRequestTracker(long logIndex) {
-        for (ClientRequestTracker tracker : trackerList) {
-            if (tracker.getIndex() == logIndex) {
-                return tracker;
-            }
-        }
-        return null;
-    }
-
     @Override
     protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply) {
@@ -493,7 +509,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         // Create a tracker entry we will use this later to notify the
         // client actor
-        trackerList.add(
+        trackers.add(
             new ClientRequestTrackerImpl(replicate.getClientActor(),
                 replicate.getIdentifier(),
                 logIndex)
@@ -779,10 +795,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     protected boolean isLeaderIsolated() {
         int minPresent = getMinIsolatedLeaderPeerCount();
         for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
-            if (followerLogInformation.isFollowerActive()) {
+            final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId());
+            if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) {
                 --minPresent;
                 if (minPresent == 0) {
-                    break;
+                    return false;
                 }
             }
         }