java.lang.AssertionError: Rpc failed with error: RpcError [message=leadership transfer failed, severity=ERROR, errorType=APPLICATION, tag=operation-failed, applicationTag=null, info=null, cause=org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException: Failed to transfer leadership to member-2-shard-cars-config_testModuleShardLeaderMovement. Follower is not ready to become leader]
at org.opendaylight.controller.cluster.datastore.admin.ClusterAdminRpcServiceTest.verifySuccessfulRpcResult(ClusterAdminRpcServiceTest.java:461)
at org.opendaylight.controller.cluster.datastore.admin.ClusterAdminRpcServiceTest.doMakeShardLeaderLocal(ClusterAdminRpcServiceTest.java:450)
at org.opendaylight.controller.cluster.datastore.admin.ClusterAdminRpcServiceTest.testModuleShardLeaderMovement(ClusterAdminRpcServiceTest.java:263)
It failed when trying to make member-2 the leader for a couple reasons. One is that
member-2 hadn't yet received the MemberUp event for member-3 from akka clustering and
thus didn't have its address when it started the election and tried to send
RequestVote.
The second problem is a result of the first - since member-2 couldn't get a vote
from member-3, it needed the vote from member-1, which was in the process of stepping
down as leader. When member-1 received the RequestVote with the higher term, it
switched to Follower. Therefore member-2 didn't receive any votes for that election
term. The request to transfer leadership, which was issued on member-1, then timed out
and failed.
The wait period for the new leader to be elected is 2 sec. This was chosen b/c
originally leadership transfer was only used on shutdown and we don't want to
block shutdown for too long. However, when requesting leadership outside of shutdown,
we should wait at least one election timeout period (plus some cushion to take into
account the variance).
This alleviates the time out but it still failed sometimes if member-1 timed out
in the Follower state and started a new election before member-2 timed out in
Candidate state. member-1 would then win the election and grab leadership back.
To alleiviate this, it would be ideal if member-1 replied to the RequestVote from
member-2 prior to switching to Follower. Normally when it receives a RaftRPC with
a higher term, the Leader is supposed to immediately switch to Follower and not
process and reply to the RaftRPC, as per raft. However if it's in the process of
transferring leadership it makes sense to process the RequestVote and make every
effort to get the requesting node elected.
I also fixed a couple issues in the test code, mainly adding waitForMembersUp.
Change-Id: Ibb1b00f03065680fe1fd338c3d26161ec6336d5a
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
private RaftActorServerConfigurationSupport serverConfigurationSupport;
- private RaftActorLeadershipTransferCohort leadershipTransferInProgress;
-
private boolean shuttingDown;
protected RaftActor(String id, Map<String, String> peerAddresses,
+ ". Follower is not ready to become leader")),
getSelf());
}
- }, message.getRequestedFollowerId());
+ }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
}
private boolean possiblyHandleBehaviorMessage(final Object message) {
return false;
}
- private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
- initiateLeadershipTransfer(onComplete, null);
- }
-
private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
- final String followerId) {
+ @Nullable final String followerId, long newLeaderTimeoutInMillis) {
LOG.debug("{}: Initiating leader transfer", persistenceId());
+ RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
if (leadershipTransferInProgress == null) {
leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
+ leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis);
leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
public void onSuccess(ActorRef raftActorRef) {
- leadershipTransferInProgress = null;
+ context.setRaftActorLeadershipTransferCohort(null);
}
@Override
public void onFailure(ActorRef raftActorRef) {
- leadershipTransferInProgress = null;
+ context.setRaftActorLeadershipTransferCohort(null);
}
});
leadershipTransferInProgress.addOnComplete(onComplete);
+
+ context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress);
leadershipTransferInProgress.init();
+
} else {
LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
leadershipTransferInProgress.addOnComplete(onComplete);
LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
}
- });
+ }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS));
} else {
pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
@Override
onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
+ RaftActorLeadershipTransferCohort leadershipTransferInProgress =
+ context.getRaftActorLeadershipTransferCohort();
if (leadershipTransferInProgress != null) {
leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
}
}
private boolean isLeadershipTransferInProgress() {
+ RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
}
initializeBehavior();
}
}
- });
+ }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
}
}
*/
@Nonnull
FileBackedOutputStream newFileBackedOutputStream();
+
+ /**
+ * Returns the RaftActorLeadershipTransferCohort if leadership transfer is in progress.
+ *
+ * @return the RaftActorLeadershipTransferCohort if leadership transfer is in progress, null otherwise
+ */
+ @Nullable
+ RaftActorLeadershipTransferCohort getRaftActorLeadershipTransferCohort();
+
+ /**
+ * Sets the RaftActorLeadershipTransferCohort for transferring leadership.
+ *
+ * @param leadershipTransferCohort the RaftActorLeadershipTransferCohort or null to clear the existing one
+ */
+ void setRaftActorLeadershipTransferCohort(@Nullable RaftActorLeadershipTransferCohort leadershipTransferCohort);
}
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
private final Consumer<ApplyState> applyStateConsumer;
+ private RaftActorLeadershipTransferCohort leadershipTransferCohort;
+
public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
@Nonnull ElectionTerm termInformation, long commitIndex, long lastApplied,
@Nonnull Map<String, String> peerAddresses,
}
}
}
+
+ @Override
+ @Nullable
+ public RaftActorLeadershipTransferCohort getRaftActorLeadershipTransferCohort() {
+ return leadershipTransferCohort;
+ }
+
+ @Override
+ public void setRaftActorLeadershipTransferCohort(
+ @Nullable RaftActorLeadershipTransferCohort leadershipTransferCohort) {
+ this.leadershipTransferCohort = leadershipTransferCohort;
+ }
}
public class RaftActorLeadershipTransferCohort {
private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
+ static final long USE_DEFAULT_LEADER_TIMEOUT = -1;
+
private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
private final Stopwatch transferTimer = Stopwatch.createUnstarted();
private final RaftActor raftActor;
RaftActorLeadershipTransferCohort(final RaftActor raftActor, @Nullable final String requestedFollowerId) {
this.raftActor = raftActor;
this.requestedFollowerId = requestedFollowerId;
+
+ // We'll wait an election timeout period for a new leader to be elected plus some cushion to take into
+ // account the variance.
+ final long electionTimeout = raftActor.getRaftActorContext().getConfigParams()
+ .getElectionTimeOutInterval().toMillis();
+ final int variance = raftActor.getRaftActorContext().getConfigParams().getElectionTimeVariance();
+ newLeaderTimeoutInMillis = electionTimeout + variance * 2;
}
void init() {
// and convert to follower due to higher term. We should then get an AppendEntries heart
// beat with the new leader id.
- // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new
- // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it
- // safely run on the actor's thread dispatcher.
+ // Add a timer in case we don't get a leader change. Note: the Runnable is sent as a message to the raftActor
+ // which executes it safely run on the actor's thread dispatcher.
FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
(Runnable) () -> {
return isTransferring;
}
- @VisibleForTesting
void setNewLeaderTimeoutInMillis(final long newLeaderTimeoutInMillis) {
- this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
+ if (newLeaderTimeoutInMillis != USE_DEFAULT_LEADER_TIMEOUT) {
+ this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
+ }
}
public Optional<String> getRequestedFollowerId() {
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+ // This is a special case. Normally when stepping down as leader we don't process and reply to the
+ // RaftRPC as per raft. But if we're in the process of transferring leadership and we get a
+ // RequestVote, process the RequestVote before switching to Follower. This enables the requesting
+ // candidate node to be elected the leader faster and avoids us possibly timing out in the Follower
+ // state and starting a new election and grabbing leadership back before the other candidate node can
+ // start a new election due to lack of responses. This case would only occur if there isn't a majority
+ // of other nodes available that can elect the requesting candidate. Since we're transferring
+ // leadership, we should make every effort to get the requesting node elected.
+ if (message instanceof RequestVote && context.getRaftActorLeadershipTransferCohort() != null) {
+ log.debug("{}: Leadership transfer in progress - processing RequestVote", logName());
+ super.handleMessage(sender, message);
+ }
+
return internalSwitchBehavior(RaftState.Follower);
}
}
import akka.actor.Props;
import akka.actor.Status;
import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
+import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import scala.concurrent.Await;
import scala.concurrent.Future;
private void createRaftActors() {
testLog.info("createRaftActors starting");
+ final Snapshot snapshot = Snapshot.create(EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1,
+ 1, null, new org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload(
+ Arrays.asList(new ServerInfo(leaderId, true), new ServerInfo(follower1Id, true),
+ new ServerInfo(follower2Id, true), new ServerInfo(follower3Id, false))));
+
+ InMemorySnapshotStore.addSnapshot(leaderId, snapshot);
+ InMemorySnapshotStore.addSnapshot(follower1Id, snapshot);
+ InMemorySnapshotStore.addSnapshot(follower2Id, snapshot);
+ InMemorySnapshotStore.addSnapshot(follower3Id, snapshot);
+
follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
factory.generateActorId(follower1Id + "-notifier"));
follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
leaderContext = leaderActor.underlyingActor().getRaftActorContext();
- leaderContext.getPeerInfo(follower3Id).setVotingState(VotingState.NON_VOTING);
waitUntilLeader(leaderActor);
testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging ending");
}
+
@Test
public void testRequestLeadershipTransferToFollower2WithFollower2Shutdown() throws Exception {
testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown starting");
testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown ending");
}
+
+ @Test
+ public void testRequestLeadershipTransferToFollower2WithOtherFollowersDown() throws Exception {
+ testLog.info("testRequestLeadershipTransferToFollower2WithOtherFollowersDown starting");
+
+ createRaftActors();
+ createRequestLeadershipResultCollectorActor();
+
+ factory.killActor(follower1Actor, new JavaTestKit(getSystem()));
+ factory.killActor(follower3Actor, new JavaTestKit(getSystem()));
+
+ sendFollower2RequestLeadershipTransferToLeader();
+
+ expectFirstMatching(requestLeadershipResultCollectorActor, Status.Success.class);
+
+ verifyRaftState(follower2Actor, RaftState.Leader);
+ verifyRaftState(leaderActor, RaftState.Follower);
+
+ testLog.info("testRequestLeadershipTransferToFollower2WithOtherFollowersDown ending");
+ }
}
return newFailedRpcResultFuture("A valid DataStoreType must be specified");
}
- LOG.info("Moving leader to local node for shard {}, datastoreType {}", shardName, dataStoreType);
-
ActorContext actorContext = dataStoreType == DataStoreType.Config
? configDataStore.getActorContext()
: operDataStore.getActorContext();
+ LOG.info("Moving leader to local node {} for shard {}, datastoreType {}",
+ actorContext.getCurrentMemberName().getName(), shardName, dataStoreType);
+
final scala.concurrent.Future<ActorRef> localShardReply =
actorContext.findLocalShardAsync(shardName);
return;
}
- LOG.debug("Leadership transfer complete {}.", success);
+ LOG.debug("Leadership transfer complete");
future.set(RpcResultBuilder.<Void>success().build());
}
}, actorContext.getClientDispatcher());
.moduleShardsConfig(moduleShardsConfig).build();
member1.waitForMembersUp("member-2", "member-3");
+ replicaNode2.waitForMembersUp("member-1");
+ replicaNode3.waitForMembersUp("member-1", "member-2");
doAddShardReplica(replicaNode2, "cars", "member-1");
doAddShardReplica(replicaNode3, "cars", "member-1", "member-2");
verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
doMakeShardLeaderLocal(member1, "cars", "member-1");
- replicaNode2.kit().waitUntilLeader(replicaNode2.configDataStore().getActorContext(), "cars");
- replicaNode3.kit().waitUntilLeader(replicaNode3.configDataStore().getActorContext(), "cars");
+ verifyRaftState(replicaNode2.configDataStore(), "cars",
+ raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
+ verifyRaftState(replicaNode3.configDataStore(), "cars",
+ raftState -> assertThat(raftState.getLeader(),containsString("member-1")));
doMakeShardLeaderLocal(replicaNode2, "cars", "member-2");
- member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), "cars");
- replicaNode3.kit().waitUntilLeader(replicaNode3.configDataStore().getActorContext(), "cars");
+ verifyRaftState(member1.configDataStore(), "cars",
+ raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
+ verifyRaftState(replicaNode3.configDataStore(), "cars",
+ raftState -> assertThat(raftState.getLeader(),containsString("member-2")));
+ replicaNode2.waitForMembersUp("member-3");
doMakeShardLeaderLocal(replicaNode3, "cars", "member-3");
}
org.slf4j.simpleLogger.showShortLogName=true
org.slf4j.simpleLogger.levelInBrackets=true
org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug
-org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.Shard=error
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.Shard=debug
org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.utils.ActorContext=error
org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.raft.RaftActorServerConfigurationSupport=debug
-org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off
\ No newline at end of file
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort=debug
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off