Do not update term from unreachable members 64/84764/1
authorTomas Cere <tomas.cere@pantheon.tech>
Tue, 2 Jul 2019 07:58:13 +0000 (09:58 +0200)
committerRobert Varga <nite@hq.sk>
Fri, 27 Sep 2019 04:58:43 +0000 (04:58 +0000)
In case of a one way connection problem, a follower that cannot
receive messages but can send them, it will change into Candidate
and continuosly bump its term while it cannot receive any
RequestVoteReplies.

Since the messages it sends can actually get through, this means
that the healthy part of the cluster will have its leader dropped
every 2 election timeouts.

Change this up so we only update term when receiving a RequestVote
from a reachable node.

Change-Id: Ia8a0b59cfb2d0cd68096172a1d2d47f68e7ed473
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 7aaff9ef1da193ee421541db1a5b57a7cbf51fb2)

opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java

index 76b757be37e176ca13d062e897edfbaa4029005b..32e2e09c3fe2b13482a4f02a2a460e5ef45643dd 100644 (file)
@@ -469,7 +469,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // If RPC request or response contains term T > currentTerm:
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
-            if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+            if (rpc.getTerm() > context.getTermInformation().getCurrentTerm() && shouldUpdateTerm(rpc)) {
                 log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
                         logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
 
@@ -483,9 +483,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 // start a new election due to lack of responses. This case would only occur if there isn't a majority
                 // of other nodes available that can elect the requesting candidate. Since we're transferring
                 // leadership, we should make every effort to get the requesting node elected.
-                if (message instanceof RequestVote && context.getRaftActorLeadershipTransferCohort() != null) {
+                if (rpc instanceof RequestVote && context.getRaftActorLeadershipTransferCohort() != null) {
                     log.debug("{}: Leadership transfer in progress - processing RequestVote", logName());
-                    super.handleMessage(sender, message);
+                    super.handleMessage(sender, rpc);
                 }
 
                 return internalSwitchBehavior(RaftState.Follower);
index 2b28c372488709646f96c3db5d31a7e9e267ef1e..3440de9dd9bc808c232bbcff54808940aa83667e 100644 (file)
@@ -11,8 +11,12 @@ import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorRef;
 import akka.actor.Cancellable;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Optional;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
@@ -22,6 +26,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
@@ -502,4 +507,38 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     protected String getId() {
         return context.getId();
     }
+
+    // Check whether we should update the term. In case of half-connected nodes, we want to ignore RequestVote
+    // messages, as the candidate is not able to receive our response.
+    protected boolean shouldUpdateTerm(final RaftRPC rpc) {
+        if (!(rpc instanceof RequestVote)) {
+            return true;
+        }
+
+        final RequestVote requestVote = (RequestVote) rpc;
+        log.debug("{}: Found higher term in RequestVote rpc, verifying whether it's safe to update term.", logName());
+        final Optional<Cluster> maybeCluster = context.getCluster();
+        if (!maybeCluster.isPresent()) {
+            return true;
+        }
+
+        final Cluster cluster = maybeCluster.get();
+
+        final Set<Member> unreachable = cluster.state().getUnreachable();
+        log.debug("{}: Cluster state: {}", logName(), unreachable);
+
+        for (Member member : unreachable) {
+            for (String role : member.getRoles()) {
+                if (requestVote.getCandidateId().startsWith(role)) {
+                    log.debug("{}: Unreachable member: {}, matches candidateId in: {}, not updating term", logName(),
+                        member, requestVote);
+                    return false;
+                }
+            }
+        }
+
+        log.debug("{}: Candidate in requestVote:{} with higher term appears reachable, updating term.", logName(),
+            requestVote);
+        return true;
+    }
 }
index b8d229d84ca3e607cc3cae1d1f7893a45124c543..d88c30db333bcf040b20febc76884c9bc13e9147 100644 (file)
@@ -454,7 +454,7 @@ public class Follower extends AbstractRaftActorBehavior {
         // If RPC request or response contains term T > currentTerm:
         // set currentTerm = T, convert to follower (§5.1)
         // This applies to all RPC messages and responses
-        if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+        if (rpc.getTerm() > context.getTermInformation().getCurrentTerm() && shouldUpdateTerm(rpc)) {
             log.info("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
                 logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
 
index 80e992e1b1e7c7cb85cb6ae9e7e159eb5ae37a29..7308d95c4f8989a8d43c75f30b18bca275fa33e5 100644 (file)
     <dependency>
       <groupId>org.awaitility</groupId>
       <artifactId>awaitility</artifactId>
-      <version>3.1.6</version>
       <scope>test</scope>
     </dependency>
     <dependency>
index 5abadccd500d42745dde4c37998c2ee885a2a11e..94813e84991aef33c558e94b294dbce120dbe779 100644 (file)
@@ -24,6 +24,7 @@ import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
+import akka.cluster.Member;
 import akka.dispatch.Futures;
 import akka.pattern.Patterns;
 import akka.testkit.javadsl.TestKit;
@@ -81,7 +82,10 @@ import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardData
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
@@ -1250,6 +1254,60 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         }
     }
 
+    @Test
+    public void testSemiReachableCandidateNotDroppingLeader() throws Exception {
+        final String testName = "testSemiReachableCandidateNotDroppingLeader";
+        initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
+
+        final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder()
+                .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10);
+        final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
+                follower2System, follower2DatastoreContextBuilder, commitTimeout);
+
+        final AbstractDataStore ds2 =
+                     follower2TestKit.setupAbstractDataStore(
+                             testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS);
+
+        followerTestKit.waitForMembersUp("member-1", "member-3");
+        follower2TestKit.waitForMembersUp("member-1", "member-2");
+
+        TestKit.shutdownActorSystem(follower2System);
+
+        ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+        OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
+                .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
+
+        Cluster leaderCluster = Cluster.get(leaderSystem);
+        Cluster followerCluster = Cluster.get(followerSystem);
+        Cluster follower2Cluster = Cluster.get(follower2System);
+
+        Member follower2Member = follower2Cluster.readView().self();
+
+        await().atMost(10, TimeUnit.SECONDS)
+                .until(() -> leaderCluster.readView().unreachableMembers().contains(follower2Member));
+        await().atMost(10, TimeUnit.SECONDS)
+                .until(() -> followerCluster.readView().unreachableMembers().contains(follower2Member));
+
+        ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+
+        // to simulate a follower not being able to receive messages, but still being able to send messages and becoming
+        // candidate, we can just send a couple of RequestVotes to both leader and follower.
+        cars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null);
+        followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null);
+        cars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null);
+        followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null);
+
+        OnDemandRaftState stateAfter = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
+                .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
+        OnDemandRaftState followerState = (OnDemandRaftState) followerDistributedDataStore.getActorUtils()
+                .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
+
+        assertEquals(initialState.getCurrentTerm(), stateAfter.getCurrentTerm());
+        assertEquals(initialState.getCurrentTerm(), followerState.getCurrentTerm());
+
+        ds2.close();
+    }
+
     @Test
     public void testInstallSnapshot() throws Exception {
         final String testName = "testInstallSnapshot";