Fix intermittent failure in ClusterAdminRpcServiceTest.testModuleShardLeaderMovement 80/55080/3
authorTom Pantelis <tompantelis@gmail.com>
Sat, 15 Apr 2017 02:00:57 +0000 (22:00 -0400)
committerTom Pantelis <tompantelis@gmail.com>
Sat, 15 Apr 2017 15:08:55 +0000 (15:08 +0000)
java.lang.AssertionError: Rpc failed with error: RpcError [message=leadership transfer failed, severity=ERROR, errorType=APPLICATION, tag=operation-failed, applicationTag=null, info=null, cause=org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException: Failed to transfer leadership to member-2-shard-cars-config_testModuleShardLeaderMovement. Follower is not ready to become leader]
  at org.opendaylight.controller.cluster.datastore.admin.ClusterAdminRpcServiceTest.verifySuccessfulRpcResult(ClusterAdminRpcServiceTest.java:461)
  at org.opendaylight.controller.cluster.datastore.admin.ClusterAdminRpcServiceTest.doMakeShardLeaderLocal(ClusterAdminRpcServiceTest.java:450)
  at org.opendaylight.controller.cluster.datastore.admin.ClusterAdminRpcServiceTest.testModuleShardLeaderMovement(ClusterAdminRpcServiceTest.java:263)

It failed when trying to make member-2 the leader for a couple reasons. One is that
member-2 hadn't yet received the MemberUp event for member-3 from akka clustering and
thus didn't have its address when it started the election and tried to send
RequestVote.

The second problem is a result of the first - since member-2 couldn't get a vote
from member-3, it needed the vote from member-1, which was in the process of stepping
down as leader. When member-1 received the RequestVote with the higher term, it
switched to Follower. Therefore member-2 didn't receive any votes for that election
term. The request to transfer leadership, which was issued on member-1, then timed out
and failed.

The wait period for the new leader to be elected is 2 sec. This was chosen b/c
originally leadership transfer was only used on shutdown and we don't want to
block shutdown for too long. However, when requesting leadership outside of shutdown,
we should wait at least one election timeout period (plus some cushion to take into
account the variance).

This alleviates the time out but it still failed sometimes if member-1 timed out
in the Follower state and started a new election before member-2 timed out in
Candidate state. member-1 would then win the election and grab leadership back.
To alleiviate this, it would be ideal if member-1 replied to the RequestVote from
member-2 prior to switching to Follower. Normally when it receives a RaftRPC with
a higher term, the Leader is supposed to immediately switch to Follower and not
process and reply to the RaftRPC, as per raft. However if it's in the process of
transferring leadership it makes sense to process the RequestVote and make every
effort to get the requesting node elected.

I also fixed a couple issues in the test code, mainly adding waitForMembersUp.

Change-Id: Ibb1b00f03065680fe1fd338c3d26161ec6336d5a
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java
opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java
opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java
opendaylight/md-sal/sal-cluster-admin-impl/src/test/resources/simplelogger.properties

index 70a0b86..e2bef2e 100644 (file)
@@ -121,8 +121,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private RaftActorServerConfigurationSupport serverConfigurationSupport;
 
-    private RaftActorLeadershipTransferCohort leadershipTransferInProgress;
-
     private boolean shuttingDown;
 
     protected RaftActor(String id, Map<String, String> peerAddresses,
@@ -309,7 +307,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                                         + ". Follower is not ready to become leader")),
                         getSelf());
             }
-        }, message.getRequestedFollowerId());
+        }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
     }
 
     private boolean possiblyHandleBehaviorMessage(final Object message) {
@@ -328,30 +326,31 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return false;
     }
 
-    private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
-        initiateLeadershipTransfer(onComplete, null);
-    }
-
     private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
-                                            final String followerId) {
+            @Nullable final String followerId, long newLeaderTimeoutInMillis) {
         LOG.debug("{}: Initiating leader transfer", persistenceId());
 
+        RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
         if (leadershipTransferInProgress == null) {
             leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
+            leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis);
             leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
                 @Override
                 public void onSuccess(ActorRef raftActorRef) {
-                    leadershipTransferInProgress = null;
+                    context.setRaftActorLeadershipTransferCohort(null);
                 }
 
                 @Override
                 public void onFailure(ActorRef raftActorRef) {
-                    leadershipTransferInProgress = null;
+                    context.setRaftActorLeadershipTransferCohort(null);
                 }
             });
 
             leadershipTransferInProgress.addOnComplete(onComplete);
+
+            context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress);
             leadershipTransferInProgress.init();
+
         } else {
             LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
             leadershipTransferInProgress.addOnComplete(onComplete);
@@ -387,7 +386,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
                     raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
                 }
-            });
+            }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS));
         } else {
             pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
                 @Override
@@ -517,6 +516,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
             onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
 
+            RaftActorLeadershipTransferCohort leadershipTransferInProgress =
+                    context.getRaftActorLeadershipTransferCohort();
             if (leadershipTransferInProgress != null) {
                 leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
             }
@@ -655,6 +656,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     private boolean isLeadershipTransferInProgress() {
+        RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
         return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
     }
 
@@ -908,7 +910,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                         initializeBehavior();
                     }
                 }
-            });
+            }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
         }
     }
 
index 1d59ed1..0fe6cf1 100644 (file)
@@ -335,4 +335,19 @@ public interface RaftActorContext {
      */
     @Nonnull
     FileBackedOutputStream newFileBackedOutputStream();
+
+    /**
+     * Returns the RaftActorLeadershipTransferCohort if leadership transfer is in progress.
+     *
+     * @return the RaftActorLeadershipTransferCohort if leadership transfer is in progress, null otherwise
+     */
+    @Nullable
+    RaftActorLeadershipTransferCohort getRaftActorLeadershipTransferCohort();
+
+    /**
+     * Sets the RaftActorLeadershipTransferCohort for transferring leadership.
+     *
+     * @param leadershipTransferCohort the RaftActorLeadershipTransferCohort or null to clear the existing one
+     */
+    void setRaftActorLeadershipTransferCohort(@Nullable RaftActorLeadershipTransferCohort leadershipTransferCohort);
 }
index b307195..5b130db 100644 (file)
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -88,6 +89,8 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private final Consumer<ApplyState> applyStateConsumer;
 
+    private RaftActorLeadershipTransferCohort leadershipTransferCohort;
+
     public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
             @Nonnull ElectionTerm termInformation, long commitIndex, long lastApplied,
             @Nonnull Map<String, String> peerAddresses,
@@ -416,4 +419,16 @@ public class RaftActorContextImpl implements RaftActorContext {
             }
         }
     }
+
+    @Override
+    @Nullable
+    public RaftActorLeadershipTransferCohort getRaftActorLeadershipTransferCohort() {
+        return leadershipTransferCohort;
+    }
+
+    @Override
+    public void setRaftActorLeadershipTransferCohort(
+            @Nullable RaftActorLeadershipTransferCohort leadershipTransferCohort) {
+        this.leadershipTransferCohort = leadershipTransferCohort;
+    }
 }
index 59ae4d3..9ee4c73 100644 (file)
@@ -54,6 +54,8 @@ import scala.concurrent.duration.FiniteDuration;
 public class RaftActorLeadershipTransferCohort {
     private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
 
+    static final long USE_DEFAULT_LEADER_TIMEOUT = -1;
+
     private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
     private final Stopwatch transferTimer = Stopwatch.createUnstarted();
     private final RaftActor raftActor;
@@ -70,6 +72,13 @@ public class RaftActorLeadershipTransferCohort {
     RaftActorLeadershipTransferCohort(final RaftActor raftActor, @Nullable final String requestedFollowerId) {
         this.raftActor = raftActor;
         this.requestedFollowerId = requestedFollowerId;
+
+        // We'll wait an election timeout period for a new leader to be elected plus some cushion to take into
+        // account the variance.
+        final long electionTimeout = raftActor.getRaftActorContext().getConfigParams()
+                .getElectionTimeOutInterval().toMillis();
+        final int variance = raftActor.getRaftActorContext().getConfigParams().getElectionTimeVariance();
+        newLeaderTimeoutInMillis = electionTimeout + variance * 2;
     }
 
     void init() {
@@ -141,9 +150,8 @@ public class RaftActorLeadershipTransferCohort {
         // and convert to follower due to higher term. We should then get an AppendEntries heart
         // beat with the new leader id.
 
-        // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new
-        // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it
-        // safely run on the actor's thread dispatcher.
+        // Add a timer in case we don't get a leader change. Note: the Runnable is sent as a message to the raftActor
+        // which executes it safely run on the actor's thread dispatcher.
         FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
         newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
             (Runnable) () -> {
@@ -189,9 +197,10 @@ public class RaftActorLeadershipTransferCohort {
         return isTransferring;
     }
 
-    @VisibleForTesting
     void setNewLeaderTimeoutInMillis(final long newLeaderTimeoutInMillis) {
-        this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
+        if (newLeaderTimeoutInMillis != USE_DEFAULT_LEADER_TIMEOUT) {
+            this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
+        }
     }
 
     public Optional<String> getRequestedFollowerId() {
index d855507..097f0ec 100644 (file)
@@ -45,6 +45,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
@@ -448,6 +449,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
 
+                // This is a special case. Normally when stepping down as leader we don't process and reply to the
+                // RaftRPC as per raft. But if we're in the process of transferring leadership and we get a
+                // RequestVote, process the RequestVote before switching to Follower. This enables the requesting
+                // candidate node to be elected the leader faster and avoids us possibly timing out in the Follower
+                // state and starting a new election and grabbing leadership back before the other candidate node can
+                // start a new election due to lack of responses. This case would only occur if there isn't a majority
+                // of other nodes available that can elect the requesting candidate. Since we're transferring
+                // leadership, we should make every effort to get the requesting node elected.
+                if (message instanceof RequestVote && context.getRaftActorLeadershipTransferCohort() != null) {
+                    log.debug("{}: Leadership transfer in progress - processing RequestVote", logName());
+                    super.handleMessage(sender, message);
+                }
+
                 return internalSwitchBehavior(RaftState.Follower);
             }
         }
index 7ed4595..1542a33 100644 (file)
@@ -17,8 +17,10 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.Status;
 import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -31,6 +33,10 @@ import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
+import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -138,6 +144,16 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat
     private void createRaftActors() {
         testLog.info("createRaftActors starting");
 
+        final Snapshot snapshot = Snapshot.create(EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1,
+                1, null, new org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload(
+                        Arrays.asList(new ServerInfo(leaderId, true), new ServerInfo(follower1Id, true),
+                                new ServerInfo(follower2Id, true), new ServerInfo(follower3Id, false))));
+
+        InMemorySnapshotStore.addSnapshot(leaderId, snapshot);
+        InMemorySnapshotStore.addSnapshot(follower1Id, snapshot);
+        InMemorySnapshotStore.addSnapshot(follower2Id, snapshot);
+        InMemorySnapshotStore.addSnapshot(follower3Id, snapshot);
+
         follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
                 factory.generateActorId(follower1Id + "-notifier"));
         follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
@@ -177,7 +193,6 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat
         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
 
         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
-        leaderContext.getPeerInfo(follower3Id).setVotingState(VotingState.NON_VOTING);
 
         waitUntilLeader(leaderActor);
 
@@ -283,6 +298,7 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat
         testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging ending");
     }
 
+
     @Test
     public void testRequestLeadershipTransferToFollower2WithFollower2Shutdown() throws Exception {
         testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown starting");
@@ -302,4 +318,24 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat
 
         testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown ending");
     }
+
+    @Test
+    public void testRequestLeadershipTransferToFollower2WithOtherFollowersDown() throws Exception {
+        testLog.info("testRequestLeadershipTransferToFollower2WithOtherFollowersDown starting");
+
+        createRaftActors();
+        createRequestLeadershipResultCollectorActor();
+
+        factory.killActor(follower1Actor, new JavaTestKit(getSystem()));
+        factory.killActor(follower3Actor, new JavaTestKit(getSystem()));
+
+        sendFollower2RequestLeadershipTransferToLeader();
+
+        expectFirstMatching(requestLeadershipResultCollectorActor, Status.Success.class);
+
+        verifyRaftState(follower2Actor, RaftState.Leader);
+        verifyRaftState(leaderActor, RaftState.Follower);
+
+        testLog.info("testRequestLeadershipTransferToFollower2WithOtherFollowersDown ending");
+    }
 }
index ab85331..0f2e812 100644 (file)
@@ -186,12 +186,13 @@ public class ClusterAdminRpcService implements ClusterAdminService {
             return newFailedRpcResultFuture("A valid DataStoreType must be specified");
         }
 
-        LOG.info("Moving leader to local node for shard {}, datastoreType {}", shardName, dataStoreType);
-
         ActorContext actorContext = dataStoreType == DataStoreType.Config
                 ? configDataStore.getActorContext()
                 : operDataStore.getActorContext();
 
+        LOG.info("Moving leader to local node {} for shard {}, datastoreType {}",
+                actorContext.getCurrentMemberName().getName(), shardName, dataStoreType);
+
         final scala.concurrent.Future<ActorRef> localShardReply =
                 actorContext.findLocalShardAsync(shardName);
 
@@ -222,7 +223,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
                     return;
                 }
 
-                LOG.debug("Leadership transfer complete {}.", success);
+                LOG.debug("Leadership transfer complete");
                 future.set(RpcResultBuilder.<Void>success().build());
             }
         }, actorContext.getClientDispatcher());
index 90c6cd1..6fc9940 100644 (file)
@@ -246,6 +246,8 @@ public class ClusterAdminRpcServiceTest {
                 .moduleShardsConfig(moduleShardsConfig).build();
 
         member1.waitForMembersUp("member-2", "member-3");
+        replicaNode2.waitForMembersUp("member-1");
+        replicaNode3.waitForMembersUp("member-1", "member-2");
 
         doAddShardReplica(replicaNode2, "cars", "member-1");
         doAddShardReplica(replicaNode3, "cars", "member-1", "member-2");
@@ -257,13 +259,18 @@ public class ClusterAdminRpcServiceTest {
         verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
 
         doMakeShardLeaderLocal(member1, "cars", "member-1");
-        replicaNode2.kit().waitUntilLeader(replicaNode2.configDataStore().getActorContext(), "cars");
-        replicaNode3.kit().waitUntilLeader(replicaNode3.configDataStore().getActorContext(), "cars");
+        verifyRaftState(replicaNode2.configDataStore(), "cars",
+            raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
+        verifyRaftState(replicaNode3.configDataStore(), "cars",
+            raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
 
         doMakeShardLeaderLocal(replicaNode2, "cars", "member-2");
-        member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), "cars");
-        replicaNode3.kit().waitUntilLeader(replicaNode3.configDataStore().getActorContext(), "cars");
+        verifyRaftState(member1.configDataStore(), "cars",
+            raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
+        verifyRaftState(replicaNode3.configDataStore(), "cars",
+            raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
 
+        replicaNode2.waitForMembersUp("member-3");
         doMakeShardLeaderLocal(replicaNode3, "cars", "member-3");
     }
 
index 3173698..cb9c1bc 100644 (file)
@@ -4,7 +4,8 @@ org.slf4j.simpleLogger.logFile=System.out
 org.slf4j.simpleLogger.showShortLogName=true
 org.slf4j.simpleLogger.levelInBrackets=true
 org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug
-org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.Shard=error
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.Shard=debug
 org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.utils.ActorContext=error
 org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.raft.RaftActorServerConfigurationSupport=debug
-org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off
\ No newline at end of file
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort=debug
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.