Do not update term from unreachable members
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractRaftActorBehavior.java
index 2b28c372488709646f96c3db5d31a7e9e267ef1e..3440de9dd9bc808c232bbcff54808940aa83667e 100644 (file)
@@ -11,8 +11,12 @@ import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorRef;
 import akka.actor.Cancellable;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Optional;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
@@ -22,6 +26,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
@@ -502,4 +507,38 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     protected String getId() {
         return context.getId();
     }
+
+    // Check whether we should update the term. In case of half-connected nodes, we want to ignore RequestVote
+    // messages, as the candidate is not able to receive our response.
+    protected boolean shouldUpdateTerm(final RaftRPC rpc) {
+        if (!(rpc instanceof RequestVote)) {
+            return true;
+        }
+
+        final RequestVote requestVote = (RequestVote) rpc;
+        log.debug("{}: Found higher term in RequestVote rpc, verifying whether it's safe to update term.", logName());
+        final Optional<Cluster> maybeCluster = context.getCluster();
+        if (!maybeCluster.isPresent()) {
+            return true;
+        }
+
+        final Cluster cluster = maybeCluster.get();
+
+        final Set<Member> unreachable = cluster.state().getUnreachable();
+        log.debug("{}: Cluster state: {}", logName(), unreachable);
+
+        for (Member member : unreachable) {
+            for (String role : member.getRoles()) {
+                if (requestVote.getCandidateId().startsWith(role)) {
+                    log.debug("{}: Unreachable member: {}, matches candidateId in: {}, not updating term", logName(),
+                        member, requestVote);
+                    return false;
+                }
+            }
+        }
+
+        log.debug("{}: Candidate in requestVote:{} with higher term appears reachable, updating term.", logName(),
+            requestVote);
+        return true;
+    }
 }