From 0c05dff15e4f36c5ecbd26e82309de21f67c8cd5 Mon Sep 17 00:00:00 2001 From: Jakub Morvay Date: Mon, 20 Mar 2017 09:58:11 +0100 Subject: [PATCH] Bug 7407 - Add request leadership functionality to shards This adds a new MakeLeaderLocal message to Shard class API. MakeLeaderLocal message is sent to a local shard replica to request the shard leader to be moved to the local node. Local shard will contact the current leader with RequestLeadership message to initiate leadeship transfer to itself. Original sender of MakeLeaderLocal message will be notified about result of this operation. Change-Id: I2b0ee7caf772457e31250d1bdddd5fc77b16fc53 Signed-off-by: Jakub Morvay Signed-off-by: Robert Varga --- .../LeadershipTransferFailedException.java | 17 ++++ .../controller/cluster/raft/RaftActor.java | 52 ++++++++++++- .../RaftActorLeadershipTransferCohort.java | 28 +++++-- .../cluster/raft/behaviors/Leader.java | 8 ++ .../raft/messages/RequestLeadership.java | 41 ++++++++++ .../LeadershipTransferIntegrationTest.java | 78 +++++++++++++++++++ .../cluster/raft/behaviors/LeaderTest.java | 5 ++ .../controller/cluster/datastore/Shard.java | 32 ++++++++ .../datastore/messages/MakeLeaderLocal.java | 26 +++++++ 9 files changed, 277 insertions(+), 10 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/LeadershipTransferFailedException.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestLeadership.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MakeLeaderLocal.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/LeadershipTransferFailedException.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/LeadershipTransferFailedException.java new file mode 100644 index 0000000000..ce4cfcd908 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/LeadershipTransferFailedException.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +public class LeadershipTransferFailedException extends Exception { + private static final long serialVersionUID = 1L; + + public LeadershipTransferFailedException(final String message) { + super(message); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 1c057d7553..70a0b86952 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; +import akka.actor.Status; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -49,6 +50,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; @@ -263,12 +265,53 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof Runnable) { ((Runnable)message).run(); } else if (message instanceof NoopPayload) { - persistData(null, null, (NoopPayload)message, false); + persistData(null, null, (NoopPayload) message, false); + } else if (message instanceof RequestLeadership) { + onRequestLeadership((RequestLeadership) message); } else if (!possiblyHandleBehaviorMessage(message)) { handleNonRaftCommand(message); } } + private void onRequestLeadership(final RequestLeadership message) { + LOG.debug("{}: onRequestLeadership {}", persistenceId(), message); + if (!isLeader()) { + // non-leader cannot satisfy leadership request + LOG.warn("{}: onRequestLeadership {} was sent to non-leader." + + " Current behavior: {}. Sending failure response", + persistenceId(), getCurrentBehavior().state()); + message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to " + + message.getRequestedFollowerId() + + ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf()); + return; + } + + final String requestedFollowerId = message.getRequestedFollowerId(); + final ActorRef replyTo = message.getReplyTo(); + initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() { + @Override + public void onSuccess(final ActorRef raftActorRef) { + // sanity check + if (!requestedFollowerId.equals(getLeaderId())) { + onFailure(raftActorRef); + } + + LOG.debug("{}: Leadership transferred successfully to {}", persistenceId(), requestedFollowerId); + replyTo.tell(new Status.Success(null), getSelf()); + } + + @Override + public void onFailure(final ActorRef raftActorRef) { + LOG.debug("{}: LeadershipTransfer request from {} failed", persistenceId(), requestedFollowerId); + replyTo.tell(new Status.Failure( + new LeadershipTransferFailedException( + "Failed to transfer leadership to " + requestedFollowerId + + ". Follower is not ready to become leader")), + getSelf()); + } + }, message.getRequestedFollowerId()); + } + private boolean possiblyHandleBehaviorMessage(final Object message) { final RaftActorBehavior currentBehavior = getCurrentBehavior(); final BehaviorState state = behaviorStateTracker.capture(currentBehavior); @@ -286,10 +329,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) { + initiateLeadershipTransfer(onComplete, null); + } + + private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete, + final String followerId) { LOG.debug("{}: Initiating leader transfer", persistenceId()); if (leadershipTransferInProgress == null) { - leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this); + leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId); leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() { @Override public void onSuccess(ActorRef raftActorRef) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java index 32790d0f47..59ae4d3069 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java @@ -16,6 +16,7 @@ import com.google.common.base.Stopwatch; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; @@ -53,15 +54,22 @@ import scala.concurrent.duration.FiniteDuration; public class RaftActorLeadershipTransferCohort { private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class); - private final RaftActor raftActor; - private Cancellable newLeaderTimer; private final List onCompleteCallbacks = new ArrayList<>(); - private long newLeaderTimeoutInMillis = 2000; private final Stopwatch transferTimer = Stopwatch.createUnstarted(); + private final RaftActor raftActor; + private final String requestedFollowerId; + + private long newLeaderTimeoutInMillis = 2000; + private Cancellable newLeaderTimer; private boolean isTransferring; - RaftActorLeadershipTransferCohort(RaftActor raftActor) { + RaftActorLeadershipTransferCohort(final RaftActor raftActor) { + this(raftActor, null); + } + + RaftActorLeadershipTransferCohort(final RaftActor raftActor, @Nullable final String requestedFollowerId) { this.raftActor = raftActor; + this.requestedFollowerId = requestedFollowerId; } void init() { @@ -144,7 +152,7 @@ public class RaftActorLeadershipTransferCohort { }, raftActor.getContext().system().dispatcher(), raftActor.self()); } - void onNewLeader(String newLeader) { + void onNewLeader(final String newLeader) { if (newLeader != null && newLeaderTimer != null) { LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader); newLeaderTimer.cancel(); @@ -152,7 +160,7 @@ public class RaftActorLeadershipTransferCohort { } } - private void finish(boolean success) { + private void finish(final boolean success) { isTransferring = false; if (transferTimer.isRunning()) { transferTimer.stop(); @@ -173,7 +181,7 @@ public class RaftActorLeadershipTransferCohort { } } - void addOnComplete(OnComplete onComplete) { + void addOnComplete(final OnComplete onComplete) { onCompleteCallbacks.add(onComplete); } @@ -182,10 +190,14 @@ public class RaftActorLeadershipTransferCohort { } @VisibleForTesting - void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) { + void setNewLeaderTimeoutInMillis(final long newLeaderTimeoutInMillis) { this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis; } + public Optional getRequestedFollowerId() { + return Optional.fromNullable(requestedFollowerId); + } + interface OnComplete { void onSuccess(ActorRef raftActorRef); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index f5f3f51819..eb49abc17a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.ActorSelection; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import java.util.concurrent.TimeUnit; @@ -134,6 +135,13 @@ public class Leader extends AbstractLeader { return; } + final Optional requestedFollowerIdOptional + = leadershipTransferContext.transferCohort.getRequestedFollowerId(); + if (requestedFollowerIdOptional.isPresent() && !requestedFollowerIdOptional.get().equals(followerId)) { + // we want to transfer leadership to specific follower + return; + } + FollowerLogInformation followerInfo = getFollower(followerId); if (followerInfo == null) { return; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestLeadership.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestLeadership.java new file mode 100644 index 0000000000..5561085539 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestLeadership.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.messages; + +import akka.actor.ActorRef; +import com.google.common.base.Preconditions; +import java.io.Serializable; + +/** + * Message sent to leader to transfer leadership to a particular follower. + */ +public final class RequestLeadership implements Serializable { + private static final long serialVersionUID = 1L; + + private final String requestedFollowerId; + private final ActorRef replyTo; + + public RequestLeadership(final String requestedFollowerId, final ActorRef replyTo) { + this.requestedFollowerId = Preconditions.checkNotNull(requestedFollowerId); + this.replyTo = Preconditions.checkNotNull(replyTo); + } + + public String getRequestedFollowerId() { + return requestedFollowerId; + } + + public ActorRef getReplyTo() { + return replyTo; + } + + @Override + public String toString() { + return "RequestLeadership [requestedFollowerId=" + requestedFollowerId + ", replyTo=" + replyTo + "]"; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java index 83f5513d7e..7ed45956c1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java @@ -8,12 +8,14 @@ package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages; import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching; import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching; import akka.actor.ActorRef; import akka.actor.Props; +import akka.actor.Status; import akka.pattern.Patterns; import akka.testkit.TestActorRef; import com.google.common.collect.ImmutableMap; @@ -28,6 +30,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitionin import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import scala.concurrent.Await; import scala.concurrent.Future; @@ -47,6 +50,7 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat private TestActorRef follower3NotifierActor; private TestActorRef follower3Actor; private ActorRef follower3CollectorActor; + private ActorRef requestLeadershipResultCollectorActor; @Test public void testLeaderTransferOnShutDown() throws Exception { @@ -224,4 +228,78 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending"); } + + private void sendFollower2RequestLeadershipTransferToLeader() { + testLog.info("sendFollower2RequestLeadershipTransferToLeader starting"); + + leaderActor.tell( + new RequestLeadership(follower2Id, requestLeadershipResultCollectorActor), ActorRef.noSender()); + + testLog.info("sendFollower2RequestLeadershipTransferToLeader ending"); + } + + private void createRequestLeadershipResultCollectorActor() { + testLog.info("createRequestLeadershipResultCollectorActor starting"); + + requestLeadershipResultCollectorActor = factory.createActor(MessageCollectorActor.props()); + + testLog.info("createRequestLeadershipResultCollectorActor ending"); + } + + @Test + public void testSuccessfulRequestLeadershipTransferToFollower2() { + testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 starting"); + + createRaftActors(); + createRequestLeadershipResultCollectorActor(); + + sendFollower2RequestLeadershipTransferToLeader(); + + verifyRaftState(follower2Actor, RaftState.Leader); + + expectMatching(requestLeadershipResultCollectorActor, Status.Success.class, 1); + + testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 ending"); + } + + @Test + public void testRequestLeadershipTransferToFollower2WithFollower2Lagging() { + testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging starting"); + + createRaftActors(); + createRequestLeadershipResultCollectorActor(); + + sendPayloadWithFollower2Lagging(); + + sendFollower2RequestLeadershipTransferToLeader(); + + verifyRaftState(follower1Actor, RaftState.Follower); + verifyRaftState(follower2Actor, RaftState.Follower); + verifyRaftState(follower3Actor, RaftState.Follower); + + Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class); + assertTrue(failure.cause() instanceof LeadershipTransferFailedException); + + testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging ending"); + } + + @Test + public void testRequestLeadershipTransferToFollower2WithFollower2Shutdown() throws Exception { + testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown starting"); + + createRaftActors(); + createRequestLeadershipResultCollectorActor(); + + sendShutDown(follower2Actor); + + sendFollower2RequestLeadershipTransferToLeader(); + + verifyRaftState(follower1Actor, RaftState.Follower); + verifyRaftState(follower3Actor, RaftState.Follower); + + Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class); + assertTrue(failure.cause() instanceof LeadershipTransferFailedException); + + testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown ending"); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index c83a1a1d5b..76322eec1a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -14,6 +14,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -24,6 +25,7 @@ import akka.actor.Props; import akka.actor.Terminated; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; @@ -2079,6 +2081,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.transferLeadership(mockTransferCohort); verify(mockTransferCohort, never()).transferComplete(); + doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); @@ -2109,6 +2112,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.clearMessages(followerActor); RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); leader.transferLeadership(mockTransferCohort); verify(mockTransferCohort, never()).transferComplete(); @@ -2140,6 +2144,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.clearMessages(followerActor); RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); leader.transferLeadership(mockTransferCohort); verify(mockTransferCohort, never()).transferComplete(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 19a2151d2f..39ee810eb0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -12,6 +12,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; import akka.actor.Props; +import akka.actor.Status; import akka.actor.Status.Failure; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; @@ -65,6 +66,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree; +import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal; import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload; @@ -80,6 +82,7 @@ import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; +import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; @@ -87,6 +90,7 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; @@ -339,12 +343,40 @@ public class Shard extends RaftActor { } else if (message instanceof PersistAbortTransactionPayload) { final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId(); persistPayload(txId, AbortTransactionPayload.create(txId), true); + } else if (message instanceof MakeLeaderLocal) { + onMakeLeaderLocal(); } else { super.handleNonRaftCommand(message); } } } + private void onMakeLeaderLocal() { + LOG.debug("{}: onMakeLeaderLocal received", persistenceId()); + if (isLeader()) { + getSender().tell(new Status.Success(null), getSelf()); + return; + } + + final ActorSelection leader = getLeader(); + + if (leader == null) { + // Leader is not present. The cluster is most likely trying to + // elect a leader and we should let that run its normal course + + // TODO we can wait for the election to complete and retry the + // request. We can also let the caller retry by sending a flag + // in the response indicating the request is "reTryable". + getSender().tell(new Failure( + new LeadershipTransferFailedException("We cannot initiate leadership transfer to local node. " + + "Currently there is no leader for " + persistenceId())), + getSelf()); + return; + } + + leader.tell(new RequestLeadership(getId(), getSender()), getSelf()); + } + // Acquire our frontend tracking handle and verify generation matches private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException { final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MakeLeaderLocal.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MakeLeaderLocal.java new file mode 100644 index 0000000000..4e0c9cab28 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MakeLeaderLocal.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +/** + * Message sent to local shard to try to gain shard leadership. Sender of this + * message will be notified about result of leadership transfer with + * {@link akka.actor.Status.Success}, if leadership is successfully transferred + * to local shard. Otherwise {@link akka.actor.Status.Failure} with + * {@link org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException} + * will be sent to sender of this message. + */ +public final class MakeLeaderLocal { + + public static final MakeLeaderLocal INSTANCE = new MakeLeaderLocal(); + + private MakeLeaderLocal() { + + } +} -- 2.36.6