This adds a new MakeLeaderLocal message to Shard class API.
MakeLeaderLocal message is sent to a local shard replica to request
the shard leader to be moved to the local node. Local shard will
contact the current leader with RequestLeadership message to initiate
leadeship transfer to itself. Original sender of MakeLeaderLocal
message will be notified about result of this operation.
Change-Id: I2b0ee7caf772457e31250d1bdddd5fc77b16fc53
Signed-off-by: Jakub Morvay <jmorvay@cisco.com>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+public class LeadershipTransferFailedException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public LeadershipTransferFailedException(final String message) {
+ super(message);
+ }
+}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
+import akka.actor.Status;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
} else if (message instanceof Runnable) {
((Runnable)message).run();
} else if (message instanceof NoopPayload) {
- persistData(null, null, (NoopPayload)message, false);
+ persistData(null, null, (NoopPayload) message, false);
+ } else if (message instanceof RequestLeadership) {
+ onRequestLeadership((RequestLeadership) message);
} else if (!possiblyHandleBehaviorMessage(message)) {
handleNonRaftCommand(message);
}
}
+ private void onRequestLeadership(final RequestLeadership message) {
+ LOG.debug("{}: onRequestLeadership {}", persistenceId(), message);
+ if (!isLeader()) {
+ // non-leader cannot satisfy leadership request
+ LOG.warn("{}: onRequestLeadership {} was sent to non-leader."
+ + " Current behavior: {}. Sending failure response",
+ persistenceId(), getCurrentBehavior().state());
+ message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to "
+ + message.getRequestedFollowerId()
+ + ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf());
+ return;
+ }
+
+ final String requestedFollowerId = message.getRequestedFollowerId();
+ final ActorRef replyTo = message.getReplyTo();
+ initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+ @Override
+ public void onSuccess(final ActorRef raftActorRef) {
+ // sanity check
+ if (!requestedFollowerId.equals(getLeaderId())) {
+ onFailure(raftActorRef);
+ }
+
+ LOG.debug("{}: Leadership transferred successfully to {}", persistenceId(), requestedFollowerId);
+ replyTo.tell(new Status.Success(null), getSelf());
+ }
+
+ @Override
+ public void onFailure(final ActorRef raftActorRef) {
+ LOG.debug("{}: LeadershipTransfer request from {} failed", persistenceId(), requestedFollowerId);
+ replyTo.tell(new Status.Failure(
+ new LeadershipTransferFailedException(
+ "Failed to transfer leadership to " + requestedFollowerId
+ + ". Follower is not ready to become leader")),
+ getSelf());
+ }
+ }, message.getRequestedFollowerId());
+ }
+
private boolean possiblyHandleBehaviorMessage(final Object message) {
final RaftActorBehavior currentBehavior = getCurrentBehavior();
final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
}
private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
+ initiateLeadershipTransfer(onComplete, null);
+ }
+
+ private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
+ final String followerId) {
LOG.debug("{}: Initiating leader transfer", persistenceId());
if (leadershipTransferInProgress == null) {
- leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this);
+ leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
public void onSuccess(ActorRef raftActorRef) {
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
public class RaftActorLeadershipTransferCohort {
private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
- private final RaftActor raftActor;
- private Cancellable newLeaderTimer;
private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
- private long newLeaderTimeoutInMillis = 2000;
private final Stopwatch transferTimer = Stopwatch.createUnstarted();
+ private final RaftActor raftActor;
+ private final String requestedFollowerId;
+
+ private long newLeaderTimeoutInMillis = 2000;
+ private Cancellable newLeaderTimer;
private boolean isTransferring;
- RaftActorLeadershipTransferCohort(RaftActor raftActor) {
+ RaftActorLeadershipTransferCohort(final RaftActor raftActor) {
+ this(raftActor, null);
+ }
+
+ RaftActorLeadershipTransferCohort(final RaftActor raftActor, @Nullable final String requestedFollowerId) {
this.raftActor = raftActor;
+ this.requestedFollowerId = requestedFollowerId;
}
void init() {
}, raftActor.getContext().system().dispatcher(), raftActor.self());
}
- void onNewLeader(String newLeader) {
+ void onNewLeader(final String newLeader) {
if (newLeader != null && newLeaderTimer != null) {
LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
newLeaderTimer.cancel();
}
}
- private void finish(boolean success) {
+ private void finish(final boolean success) {
isTransferring = false;
if (transferTimer.isRunning()) {
transferTimer.stop();
}
}
- void addOnComplete(OnComplete onComplete) {
+ void addOnComplete(final OnComplete onComplete) {
onCompleteCallbacks.add(onComplete);
}
}
@VisibleForTesting
- void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) {
+ void setNewLeaderTimeoutInMillis(final long newLeaderTimeoutInMillis) {
this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
}
+ public Optional<String> getRequestedFollowerId() {
+ return Optional.fromNullable(requestedFollowerId);
+ }
+
interface OnComplete {
void onSuccess(ActorRef raftActorRef);
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
return;
}
+ final Optional<String> requestedFollowerIdOptional
+ = leadershipTransferContext.transferCohort.getRequestedFollowerId();
+ if (requestedFollowerIdOptional.isPresent() && !requestedFollowerIdOptional.get().equals(followerId)) {
+ // we want to transfer leadership to specific follower
+ return;
+ }
+
FollowerLogInformation followerInfo = getFollower(followerId);
if (followerInfo == null) {
return;
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+
+/**
+ * Message sent to leader to transfer leadership to a particular follower.
+ */
+public final class RequestLeadership implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String requestedFollowerId;
+ private final ActorRef replyTo;
+
+ public RequestLeadership(final String requestedFollowerId, final ActorRef replyTo) {
+ this.requestedFollowerId = Preconditions.checkNotNull(requestedFollowerId);
+ this.replyTo = Preconditions.checkNotNull(replyTo);
+ }
+
+ public String getRequestedFollowerId() {
+ return requestedFollowerId;
+ }
+
+ public ActorRef getReplyTo() {
+ return replyTo;
+ }
+
+ @Override
+ public String toString() {
+ return "RequestLeadership [requestedFollowerId=" + requestedFollowerId + ", replyTo=" + replyTo + "]";
+ }
+}
package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
import akka.actor.ActorRef;
import akka.actor.Props;
+import akka.actor.Status;
import akka.pattern.Patterns;
import akka.testkit.TestActorRef;
import com.google.common.collect.ImmutableMap;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import scala.concurrent.Await;
import scala.concurrent.Future;
private TestActorRef<MessageCollectorActor> follower3NotifierActor;
private TestActorRef<TestRaftActor> follower3Actor;
private ActorRef follower3CollectorActor;
+ private ActorRef requestLeadershipResultCollectorActor;
@Test
public void testLeaderTransferOnShutDown() throws Exception {
testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending");
}
+
+ private void sendFollower2RequestLeadershipTransferToLeader() {
+ testLog.info("sendFollower2RequestLeadershipTransferToLeader starting");
+
+ leaderActor.tell(
+ new RequestLeadership(follower2Id, requestLeadershipResultCollectorActor), ActorRef.noSender());
+
+ testLog.info("sendFollower2RequestLeadershipTransferToLeader ending");
+ }
+
+ private void createRequestLeadershipResultCollectorActor() {
+ testLog.info("createRequestLeadershipResultCollectorActor starting");
+
+ requestLeadershipResultCollectorActor = factory.createActor(MessageCollectorActor.props());
+
+ testLog.info("createRequestLeadershipResultCollectorActor ending");
+ }
+
+ @Test
+ public void testSuccessfulRequestLeadershipTransferToFollower2() {
+ testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 starting");
+
+ createRaftActors();
+ createRequestLeadershipResultCollectorActor();
+
+ sendFollower2RequestLeadershipTransferToLeader();
+
+ verifyRaftState(follower2Actor, RaftState.Leader);
+
+ expectMatching(requestLeadershipResultCollectorActor, Status.Success.class, 1);
+
+ testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 ending");
+ }
+
+ @Test
+ public void testRequestLeadershipTransferToFollower2WithFollower2Lagging() {
+ testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging starting");
+
+ createRaftActors();
+ createRequestLeadershipResultCollectorActor();
+
+ sendPayloadWithFollower2Lagging();
+
+ sendFollower2RequestLeadershipTransferToLeader();
+
+ verifyRaftState(follower1Actor, RaftState.Follower);
+ verifyRaftState(follower2Actor, RaftState.Follower);
+ verifyRaftState(follower3Actor, RaftState.Follower);
+
+ Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
+ assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
+
+ testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging ending");
+ }
+
+ @Test
+ public void testRequestLeadershipTransferToFollower2WithFollower2Shutdown() throws Exception {
+ testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown starting");
+
+ createRaftActors();
+ createRequestLeadershipResultCollectorActor();
+
+ sendShutDown(follower2Actor);
+
+ sendFollower2RequestLeadershipTransferToLeader();
+
+ verifyRaftState(follower1Actor, RaftState.Follower);
+ verifyRaftState(follower3Actor, RaftState.Follower);
+
+ Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
+ assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
+
+ testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown ending");
+ }
}
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Uninterruptibles;
leader.transferLeadership(mockTransferCohort);
verify(mockTransferCohort, never()).transferComplete();
+ doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
MessageCollectorActor.clearMessages(followerActor);
RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
leader.transferLeadership(mockTransferCohort);
verify(mockTransferCohort, never()).transferComplete();
MessageCollectorActor.clearMessages(followerActor);
RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
leader.transferLeadership(mockTransferCohort);
verify(mockTransferCohort, never()).transferComplete();
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import akka.actor.Props;
+import akka.actor.Status;
import akka.actor.Status.Failure;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
+import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
+import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
} else if (message instanceof PersistAbortTransactionPayload) {
final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
persistPayload(txId, AbortTransactionPayload.create(txId), true);
+ } else if (message instanceof MakeLeaderLocal) {
+ onMakeLeaderLocal();
} else {
super.handleNonRaftCommand(message);
}
}
}
+ private void onMakeLeaderLocal() {
+ LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
+ if (isLeader()) {
+ getSender().tell(new Status.Success(null), getSelf());
+ return;
+ }
+
+ final ActorSelection leader = getLeader();
+
+ if (leader == null) {
+ // Leader is not present. The cluster is most likely trying to
+ // elect a leader and we should let that run its normal course
+
+ // TODO we can wait for the election to complete and retry the
+ // request. We can also let the caller retry by sending a flag
+ // in the response indicating the request is "reTryable".
+ getSender().tell(new Failure(
+ new LeadershipTransferFailedException("We cannot initiate leadership transfer to local node. "
+ + "Currently there is no leader for " + persistenceId())),
+ getSelf());
+ return;
+ }
+
+ leader.tell(new RequestLeadership(getId(), getSender()), getSelf());
+ }
+
// Acquire our frontend tracking handle and verify generation matches
private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * Message sent to local shard to try to gain shard leadership. Sender of this
+ * message will be notified about result of leadership transfer with
+ * {@link akka.actor.Status.Success}, if leadership is successfully transferred
+ * to local shard. Otherwise {@link akka.actor.Status.Failure} with
+ * {@link org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException}
+ * will be sent to sender of this message.
+ */
+public final class MakeLeaderLocal {
+
+ public static final MakeLeaderLocal INSTANCE = new MakeLeaderLocal();
+
+ private MakeLeaderLocal() {
+
+ }
+}