Remove Helium protobuff code from AppendEntries
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index 497d98cdce13cded64be94696591ffdb171f22d3..2b94eb312b153867361878222001d932ad048d08 100644 (file)
@@ -140,6 +140,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     public void removeFollower(String followerId) {
         followerToLog.remove(followerId);
+        mapFollowerToSnapshot.remove(followerId);
     }
 
     public void updateMinReplicaCount() {
@@ -254,18 +255,26 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             int replicatedCount = 1;
 
             for (FollowerLogInformation info : followerToLog.values()) {
-                if ((info.getMatchIndex() >= N) && (context.getPeerInfo(followerId).isVoting())) {
+                final PeerInfo peerInfo = context.getPeerInfo(info.getId());
+                if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) {
                     replicatedCount++;
                 }
             }
 
             if (replicatedCount >= minReplicationCount) {
                 ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
-                if (replicatedLogEntry != null && replicatedLogEntry.getTerm() == currentTerm()) {
-                    context.setCommitIndex(N);
-                } else {
+                if (replicatedLogEntry == null) {
                     break;
                 }
+
+                // Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
+                // "Raft never commits log entries from previous terms by counting replicas".
+                // However we keep looping so we can make progress when new entries in the current term
+                // reach consensus, as per §5.4.1: "once an entry from the current term is committed by
+                // counting replicas, then all prior entries are committed indirectly".
+                if (replicatedLogEntry.getTerm() == currentTerm()) {
+                    context.setCommitIndex(N);
+                }
             } else {
                 break;
             }
@@ -509,7 +518,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         }
     }
 
-    private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
+    protected void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
         // Send an AppendEntries to all followers
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             final String followerId = e.getKey();
@@ -615,7 +624,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     appendEntries);
         }
 
-        followerActor.tell(appendEntries.toSerializable(), actor());
+        followerActor.tell(appendEntries, actor());
     }
 
     /**