From 7aaff9ef1da193ee421541db1a5b57a7cbf51fb2 Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Tue, 2 Jul 2019 09:58:13 +0200 Subject: [PATCH] Do not update term from unreachable members In case of a one way connection problem, a follower that cannot receive messages but can send them, it will change into Candidate and continuosly bump its term while it cannot receive any RequestVoteReplies. Since the messages it sends can actually get through, this means that the healthy part of the cluster will have its leader dropped every 2 election timeouts. Change this up so we only update term when receiving a RequestVote from a reachable node. Change-Id: Ia8a0b59cfb2d0cd68096172a1d2d47f68e7ed473 Signed-off-by: Tomas Cere Signed-off-by: Robert Varga --- .../raft/behaviors/AbstractLeader.java | 6 +- .../behaviors/AbstractRaftActorBehavior.java | 39 +++++++++++++ .../cluster/raft/behaviors/Follower.java | 2 +- .../md-sal/sal-distributed-datastore/pom.xml | 1 - ...butedDataStoreRemotingIntegrationTest.java | 58 +++++++++++++++++++ 5 files changed, 101 insertions(+), 5 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 76b757be37..32e2e09c3f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -469,7 +469,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // If RPC request or response contains term T > currentTerm: // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses - if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { + if (rpc.getTerm() > context.getTermInformation().getCurrentTerm() && shouldUpdateTerm(rpc)) { log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower", logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); @@ -483,9 +483,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // start a new election due to lack of responses. This case would only occur if there isn't a majority // of other nodes available that can elect the requesting candidate. Since we're transferring // leadership, we should make every effort to get the requesting node elected. - if (message instanceof RequestVote && context.getRaftActorLeadershipTransferCohort() != null) { + if (rpc instanceof RequestVote && context.getRaftActorLeadershipTransferCohort() != null) { log.debug("{}: Leadership transfer in progress - processing RequestVote", logName()); - super.handleMessage(sender, message); + super.handleMessage(sender, rpc); } return internalSwitchBehavior(RaftState.Follower); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 2b28c37248..3440de9dd9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -11,8 +11,12 @@ import static java.util.Objects.requireNonNull; import akka.actor.ActorRef; import akka.actor.Cancellable; +import akka.cluster.Cluster; +import akka.cluster.Member; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Optional; import java.util.Random; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; @@ -22,6 +26,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; @@ -502,4 +507,38 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected String getId() { return context.getId(); } + + // Check whether we should update the term. In case of half-connected nodes, we want to ignore RequestVote + // messages, as the candidate is not able to receive our response. + protected boolean shouldUpdateTerm(final RaftRPC rpc) { + if (!(rpc instanceof RequestVote)) { + return true; + } + + final RequestVote requestVote = (RequestVote) rpc; + log.debug("{}: Found higher term in RequestVote rpc, verifying whether it's safe to update term.", logName()); + final Optional maybeCluster = context.getCluster(); + if (!maybeCluster.isPresent()) { + return true; + } + + final Cluster cluster = maybeCluster.get(); + + final Set unreachable = cluster.state().getUnreachable(); + log.debug("{}: Cluster state: {}", logName(), unreachable); + + for (Member member : unreachable) { + for (String role : member.getRoles()) { + if (requestVote.getCandidateId().startsWith(role)) { + log.debug("{}: Unreachable member: {}, matches candidateId in: {}, not updating term", logName(), + member, requestVote); + return false; + } + } + } + + log.debug("{}: Candidate in requestVote:{} with higher term appears reachable, updating term.", logName(), + requestVote); + return true; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index b8d229d84c..d88c30db33 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -454,7 +454,7 @@ public class Follower extends AbstractRaftActorBehavior { // If RPC request or response contains term T > currentTerm: // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses - if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { + if (rpc.getTerm() > context.getTermInformation().getCurrentTerm() && shouldUpdateTerm(rpc)) { log.info("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term", logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/pom.xml b/opendaylight/md-sal/sal-distributed-datastore/pom.xml index 05e4d696d4..045083f7d9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/pom.xml +++ b/opendaylight/md-sal/sal-distributed-datastore/pom.xml @@ -211,7 +211,6 @@ org.awaitility awaitility - 3.1.6 test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 5abadccd50..94813e8499 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -24,6 +24,7 @@ import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.Cluster; +import akka.cluster.Member; import akka.dispatch.Futures; import akka.pattern.Patterns; import akka.testkit.javadsl.TestKit; @@ -81,7 +82,10 @@ import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardData import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; @@ -1250,6 +1254,60 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } } + @Test + public void testSemiReachableCandidateNotDroppingLeader() throws Exception { + final String testName = "testSemiReachableCandidateNotDroppingLeader"; + initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS); + + final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder() + .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10); + final IntegrationTestKit follower2TestKit = new IntegrationTestKit( + follower2System, follower2DatastoreContextBuilder, commitTimeout); + + final AbstractDataStore ds2 = + follower2TestKit.setupAbstractDataStore( + testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS); + + followerTestKit.waitForMembersUp("member-1", "member-3"); + follower2TestKit.waitForMembersUp("member-1", "member-2"); + + TestKit.shutdownActorSystem(follower2System); + + ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get(); + OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + .executeOperation(cars, GetOnDemandRaftState.INSTANCE); + + Cluster leaderCluster = Cluster.get(leaderSystem); + Cluster followerCluster = Cluster.get(followerSystem); + Cluster follower2Cluster = Cluster.get(follower2System); + + Member follower2Member = follower2Cluster.readView().self(); + + await().atMost(10, TimeUnit.SECONDS) + .until(() -> leaderCluster.readView().unreachableMembers().contains(follower2Member)); + await().atMost(10, TimeUnit.SECONDS) + .until(() -> followerCluster.readView().unreachableMembers().contains(follower2Member)); + + ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get(); + + // to simulate a follower not being able to receive messages, but still being able to send messages and becoming + // candidate, we can just send a couple of RequestVotes to both leader and follower. + cars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null); + followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null); + cars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null); + followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null); + + OnDemandRaftState stateAfter = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + .executeOperation(cars, GetOnDemandRaftState.INSTANCE); + OnDemandRaftState followerState = (OnDemandRaftState) followerDistributedDataStore.getActorUtils() + .executeOperation(cars, GetOnDemandRaftState.INSTANCE); + + assertEquals(initialState.getCurrentTerm(), stateAfter.getCurrentTerm()); + assertEquals(initialState.getCurrentTerm(), followerState.getCurrentTerm()); + + ds2.close(); + } + @Test public void testInstallSnapshot() throws Exception { final String testName = "testInstallSnapshot"; -- 2.36.6