Do not update term from unreachable members 76/82876/10
authorTomas Cere <tomas.cere@pantheon.tech>
Tue, 2 Jul 2019 07:58:13 +0000 (09:58 +0200)
committerRobert Varga <nite@hq.sk>
Fri, 27 Sep 2019 04:58:29 +0000 (04:58 +0000)
In case of a one way connection problem, a follower that cannot
receive messages but can send them, it will change into Candidate
and continuosly bump its term while it cannot receive any
RequestVoteReplies.

Since the messages it sends can actually get through, this means
that the healthy part of the cluster will have its leader dropped
every 2 election timeouts.

Change this up so we only update term when receiving a RequestVote
from a reachable node.

Change-Id: Ia8a0b59cfb2d0cd68096172a1d2d47f68e7ed473
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java

index 76b757b..32e2e09 100644 (file)
@@ -469,7 +469,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // If RPC request or response contains term T > currentTerm:
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
-            if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+            if (rpc.getTerm() > context.getTermInformation().getCurrentTerm() && shouldUpdateTerm(rpc)) {
                 log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
                         logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
 
@@ -483,9 +483,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 // start a new election due to lack of responses. This case would only occur if there isn't a majority
                 // of other nodes available that can elect the requesting candidate. Since we're transferring
                 // leadership, we should make every effort to get the requesting node elected.
-                if (message instanceof RequestVote && context.getRaftActorLeadershipTransferCohort() != null) {
+                if (rpc instanceof RequestVote && context.getRaftActorLeadershipTransferCohort() != null) {
                     log.debug("{}: Leadership transfer in progress - processing RequestVote", logName());
-                    super.handleMessage(sender, message);
+                    super.handleMessage(sender, rpc);
                 }
 
                 return internalSwitchBehavior(RaftState.Follower);
index 2b28c37..3440de9 100644 (file)
@@ -11,8 +11,12 @@ import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorRef;
 import akka.actor.Cancellable;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Optional;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
@@ -22,6 +26,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
@@ -502,4 +507,38 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     protected String getId() {
         return context.getId();
     }
+
+    // Check whether we should update the term. In case of half-connected nodes, we want to ignore RequestVote
+    // messages, as the candidate is not able to receive our response.
+    protected boolean shouldUpdateTerm(final RaftRPC rpc) {
+        if (!(rpc instanceof RequestVote)) {
+            return true;
+        }
+
+        final RequestVote requestVote = (RequestVote) rpc;
+        log.debug("{}: Found higher term in RequestVote rpc, verifying whether it's safe to update term.", logName());
+        final Optional<Cluster> maybeCluster = context.getCluster();
+        if (!maybeCluster.isPresent()) {
+            return true;
+        }
+
+        final Cluster cluster = maybeCluster.get();
+
+        final Set<Member> unreachable = cluster.state().getUnreachable();
+        log.debug("{}: Cluster state: {}", logName(), unreachable);
+
+        for (Member member : unreachable) {
+            for (String role : member.getRoles()) {
+                if (requestVote.getCandidateId().startsWith(role)) {
+                    log.debug("{}: Unreachable member: {}, matches candidateId in: {}, not updating term", logName(),
+                        member, requestVote);
+                    return false;
+                }
+            }
+        }
+
+        log.debug("{}: Candidate in requestVote:{} with higher term appears reachable, updating term.", logName(),
+            requestVote);
+        return true;
+    }
 }
index b8d229d..d88c30d 100644 (file)
@@ -454,7 +454,7 @@ public class Follower extends AbstractRaftActorBehavior {
         // If RPC request or response contains term T > currentTerm:
         // set currentTerm = T, convert to follower (§5.1)
         // This applies to all RPC messages and responses
-        if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+        if (rpc.getTerm() > context.getTermInformation().getCurrentTerm() && shouldUpdateTerm(rpc)) {
             log.info("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
                 logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
 
index 05e4d69..045083f 100644 (file)
     <dependency>
       <groupId>org.awaitility</groupId>
       <artifactId>awaitility</artifactId>
-      <version>3.1.6</version>
       <scope>test</scope>
     </dependency>
     <dependency>
index 5abadcc..94813e8 100644 (file)
@@ -24,6 +24,7 @@ import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
+import akka.cluster.Member;
 import akka.dispatch.Futures;
 import akka.pattern.Patterns;
 import akka.testkit.javadsl.TestKit;
@@ -81,7 +82,10 @@ import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardData
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
@@ -1250,6 +1254,60 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         }
     }
 
+    @Test
+    public void testSemiReachableCandidateNotDroppingLeader() throws Exception {
+        final String testName = "testSemiReachableCandidateNotDroppingLeader";
+        initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
+
+        final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder()
+                .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10);
+        final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
+                follower2System, follower2DatastoreContextBuilder, commitTimeout);
+
+        final AbstractDataStore ds2 =
+                     follower2TestKit.setupAbstractDataStore(
+                             testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS);
+
+        followerTestKit.waitForMembersUp("member-1", "member-3");
+        follower2TestKit.waitForMembersUp("member-1", "member-2");
+
+        TestKit.shutdownActorSystem(follower2System);
+
+        ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+        OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
+                .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
+
+        Cluster leaderCluster = Cluster.get(leaderSystem);
+        Cluster followerCluster = Cluster.get(followerSystem);
+        Cluster follower2Cluster = Cluster.get(follower2System);
+
+        Member follower2Member = follower2Cluster.readView().self();
+
+        await().atMost(10, TimeUnit.SECONDS)
+                .until(() -> leaderCluster.readView().unreachableMembers().contains(follower2Member));
+        await().atMost(10, TimeUnit.SECONDS)
+                .until(() -> followerCluster.readView().unreachableMembers().contains(follower2Member));
+
+        ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+
+        // to simulate a follower not being able to receive messages, but still being able to send messages and becoming
+        // candidate, we can just send a couple of RequestVotes to both leader and follower.
+        cars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null);
+        followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null);
+        cars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null);
+        followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null);
+
+        OnDemandRaftState stateAfter = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
+                .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
+        OnDemandRaftState followerState = (OnDemandRaftState) followerDistributedDataStore.getActorUtils()
+                .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
+
+        assertEquals(initialState.getCurrentTerm(), stateAfter.getCurrentTerm());
+        assertEquals(initialState.getCurrentTerm(), followerState.getCurrentTerm());
+
+        ds2.close();
+    }
+
     @Test
     public void testInstallSnapshot() throws Exception {
         final String testName = "testInstallSnapshot";