Bug 7407 - Add request leadership functionality to shards 42/53542/21
authorJakub Morvay <jmorvay@cisco.com>
Mon, 20 Mar 2017 08:58:11 +0000 (09:58 +0100)
committerTom Pantelis <tompantelis@gmail.com>
Sat, 1 Apr 2017 08:36:45 +0000 (08:36 +0000)
This adds a new MakeLeaderLocal message to Shard class API.
MakeLeaderLocal message is sent to a local shard replica to request
the shard leader to be moved to the local node. Local shard will
contact the current leader with RequestLeadership message to initiate
leadeship transfer to itself. Original sender of MakeLeaderLocal
message will be notified about result of this operation.

Change-Id: I2b0ee7caf772457e31250d1bdddd5fc77b16fc53
Signed-off-by: Jakub Morvay <jmorvay@cisco.com>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/LeadershipTransferFailedException.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestLeadership.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MakeLeaderLocal.java [new file with mode: 0644]

diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/LeadershipTransferFailedException.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/LeadershipTransferFailedException.java
new file mode 100644 (file)
index 0000000..ce4cfcd
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+public class LeadershipTransferFailedException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public LeadershipTransferFailedException(final String message) {
+        super(message);
+    }
+}
index 1c057d755369d50a2147cf7c3e2922783ae56fc8..70a0b86952a3ea22e04921860ae7a2bd5ef97dc1 100644 (file)
@@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.raft;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
+import akka.actor.Status;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -49,6 +50,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
@@ -263,12 +265,53 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof Runnable) {
             ((Runnable)message).run();
         } else if (message instanceof NoopPayload) {
-            persistData(null, null, (NoopPayload)message, false);
+            persistData(null, null, (NoopPayload) message, false);
+        } else if (message instanceof RequestLeadership) {
+            onRequestLeadership((RequestLeadership) message);
         } else if (!possiblyHandleBehaviorMessage(message)) {
             handleNonRaftCommand(message);
         }
     }
 
+    private void onRequestLeadership(final RequestLeadership message) {
+        LOG.debug("{}: onRequestLeadership {}", persistenceId(), message);
+        if (!isLeader()) {
+            // non-leader cannot satisfy leadership request
+            LOG.warn("{}: onRequestLeadership {} was sent to non-leader."
+                    + " Current behavior: {}. Sending failure response",
+                    persistenceId(), getCurrentBehavior().state());
+            message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to "
+                    + message.getRequestedFollowerId()
+                    + ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf());
+            return;
+        }
+
+        final String requestedFollowerId = message.getRequestedFollowerId();
+        final ActorRef replyTo = message.getReplyTo();
+        initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+            @Override
+            public void onSuccess(final ActorRef raftActorRef) {
+                // sanity check
+                if (!requestedFollowerId.equals(getLeaderId())) {
+                    onFailure(raftActorRef);
+                }
+
+                LOG.debug("{}: Leadership transferred successfully to {}", persistenceId(), requestedFollowerId);
+                replyTo.tell(new Status.Success(null), getSelf());
+            }
+
+            @Override
+            public void onFailure(final ActorRef raftActorRef) {
+                LOG.debug("{}: LeadershipTransfer request from {} failed", persistenceId(), requestedFollowerId);
+                replyTo.tell(new Status.Failure(
+                        new LeadershipTransferFailedException(
+                                "Failed to transfer leadership to " + requestedFollowerId
+                                        + ". Follower is not ready to become leader")),
+                        getSelf());
+            }
+        }, message.getRequestedFollowerId());
+    }
+
     private boolean possiblyHandleBehaviorMessage(final Object message) {
         final RaftActorBehavior currentBehavior = getCurrentBehavior();
         final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
@@ -286,10 +329,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
+        initiateLeadershipTransfer(onComplete, null);
+    }
+
+    private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
+                                            final String followerId) {
         LOG.debug("{}: Initiating leader transfer", persistenceId());
 
         if (leadershipTransferInProgress == null) {
-            leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this);
+            leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
             leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
                 @Override
                 public void onSuccess(ActorRef raftActorRef) {
index 32790d0f47251af840193ebfe77fa51c4702ffd1..59ae4d3069ba97085c3473b5581dbb608dde574f 100644 (file)
@@ -16,6 +16,7 @@ import com.google.common.base.Stopwatch;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
@@ -53,15 +54,22 @@ import scala.concurrent.duration.FiniteDuration;
 public class RaftActorLeadershipTransferCohort {
     private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
 
-    private final RaftActor raftActor;
-    private Cancellable newLeaderTimer;
     private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
-    private long newLeaderTimeoutInMillis = 2000;
     private final Stopwatch transferTimer = Stopwatch.createUnstarted();
+    private final RaftActor raftActor;
+    private final String requestedFollowerId;
+
+    private long newLeaderTimeoutInMillis = 2000;
+    private Cancellable newLeaderTimer;
     private boolean isTransferring;
 
-    RaftActorLeadershipTransferCohort(RaftActor raftActor) {
+    RaftActorLeadershipTransferCohort(final RaftActor raftActor) {
+        this(raftActor, null);
+    }
+
+    RaftActorLeadershipTransferCohort(final RaftActor raftActor, @Nullable final String requestedFollowerId) {
         this.raftActor = raftActor;
+        this.requestedFollowerId = requestedFollowerId;
     }
 
     void init() {
@@ -144,7 +152,7 @@ public class RaftActorLeadershipTransferCohort {
             }, raftActor.getContext().system().dispatcher(), raftActor.self());
     }
 
-    void onNewLeader(String newLeader) {
+    void onNewLeader(final String newLeader) {
         if (newLeader != null && newLeaderTimer != null) {
             LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
             newLeaderTimer.cancel();
@@ -152,7 +160,7 @@ public class RaftActorLeadershipTransferCohort {
         }
     }
 
-    private void finish(boolean success) {
+    private void finish(final boolean success) {
         isTransferring = false;
         if (transferTimer.isRunning()) {
             transferTimer.stop();
@@ -173,7 +181,7 @@ public class RaftActorLeadershipTransferCohort {
         }
     }
 
-    void addOnComplete(OnComplete onComplete) {
+    void addOnComplete(final OnComplete onComplete) {
         onCompleteCallbacks.add(onComplete);
     }
 
@@ -182,10 +190,14 @@ public class RaftActorLeadershipTransferCohort {
     }
 
     @VisibleForTesting
-    void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) {
+    void setNewLeaderTimeoutInMillis(final long newLeaderTimeoutInMillis) {
         this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
     }
 
+    public Optional<String> getRequestedFollowerId() {
+        return Optional.fromNullable(requestedFollowerId);
+    }
+
     interface OnComplete {
         void onSuccess(ActorRef raftActorRef);
 
index f5f3f51819be30fdeefb020ebf20100fa7c6cf29..eb49abc17aea4a0ebefacde0a5925312082e1405 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
@@ -134,6 +135,13 @@ public class Leader extends AbstractLeader {
             return;
         }
 
+        final Optional<String> requestedFollowerIdOptional
+                = leadershipTransferContext.transferCohort.getRequestedFollowerId();
+        if (requestedFollowerIdOptional.isPresent() && !requestedFollowerIdOptional.get().equals(followerId)) {
+            // we want to transfer leadership to specific follower
+            return;
+        }
+
         FollowerLogInformation followerInfo = getFollower(followerId);
         if (followerInfo == null) {
             return;
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestLeadership.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestLeadership.java
new file mode 100644 (file)
index 0000000..5561085
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+
+/**
+ * Message sent to leader to transfer leadership to a particular follower.
+ */
+public final class RequestLeadership implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String requestedFollowerId;
+    private final ActorRef replyTo;
+
+    public RequestLeadership(final String requestedFollowerId, final ActorRef replyTo) {
+        this.requestedFollowerId = Preconditions.checkNotNull(requestedFollowerId);
+        this.replyTo = Preconditions.checkNotNull(replyTo);
+    }
+
+    public String getRequestedFollowerId() {
+        return requestedFollowerId;
+    }
+
+    public ActorRef getReplyTo() {
+        return replyTo;
+    }
+
+    @Override
+    public String toString() {
+        return "RequestLeadership [requestedFollowerId=" + requestedFollowerId + ", replyTo=" + replyTo + "]";
+    }
+}
index 83f5513d7e9dd37516c5285be0dedea4a2f9eb1a..7ed45956c17f77f16cc6f60493de4097680d31c1 100644 (file)
@@ -8,12 +8,14 @@
 package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.actor.Status;
 import akka.pattern.Patterns;
 import akka.testkit.TestActorRef;
 import com.google.common.collect.ImmutableMap;
@@ -28,6 +30,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitionin
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -47,6 +50,7 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat
     private TestActorRef<MessageCollectorActor> follower3NotifierActor;
     private TestActorRef<TestRaftActor> follower3Actor;
     private ActorRef follower3CollectorActor;
+    private ActorRef requestLeadershipResultCollectorActor;
 
     @Test
     public void testLeaderTransferOnShutDown() throws Exception {
@@ -224,4 +228,78 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat
 
         testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending");
     }
+
+    private void sendFollower2RequestLeadershipTransferToLeader() {
+        testLog.info("sendFollower2RequestLeadershipTransferToLeader starting");
+
+        leaderActor.tell(
+                new RequestLeadership(follower2Id, requestLeadershipResultCollectorActor), ActorRef.noSender());
+
+        testLog.info("sendFollower2RequestLeadershipTransferToLeader ending");
+    }
+
+    private void createRequestLeadershipResultCollectorActor() {
+        testLog.info("createRequestLeadershipResultCollectorActor starting");
+
+        requestLeadershipResultCollectorActor = factory.createActor(MessageCollectorActor.props());
+
+        testLog.info("createRequestLeadershipResultCollectorActor ending");
+    }
+
+    @Test
+    public void testSuccessfulRequestLeadershipTransferToFollower2() {
+        testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 starting");
+
+        createRaftActors();
+        createRequestLeadershipResultCollectorActor();
+
+        sendFollower2RequestLeadershipTransferToLeader();
+
+        verifyRaftState(follower2Actor, RaftState.Leader);
+
+        expectMatching(requestLeadershipResultCollectorActor, Status.Success.class, 1);
+
+        testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 ending");
+    }
+
+    @Test
+    public void testRequestLeadershipTransferToFollower2WithFollower2Lagging() {
+        testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging starting");
+
+        createRaftActors();
+        createRequestLeadershipResultCollectorActor();
+
+        sendPayloadWithFollower2Lagging();
+
+        sendFollower2RequestLeadershipTransferToLeader();
+
+        verifyRaftState(follower1Actor, RaftState.Follower);
+        verifyRaftState(follower2Actor, RaftState.Follower);
+        verifyRaftState(follower3Actor, RaftState.Follower);
+
+        Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
+        assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
+
+        testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging ending");
+    }
+
+    @Test
+    public void testRequestLeadershipTransferToFollower2WithFollower2Shutdown() throws Exception {
+        testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown starting");
+
+        createRaftActors();
+        createRequestLeadershipResultCollectorActor();
+
+        sendShutDown(follower2Actor);
+
+        sendFollower2RequestLeadershipTransferToLeader();
+
+        verifyRaftState(follower1Actor, RaftState.Follower);
+        verifyRaftState(follower3Actor, RaftState.Follower);
+
+        Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
+        assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
+
+        testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown ending");
+    }
 }
index c83a1a1d5b675d53d965b446fc40fc1da8c7e827..76322eec1a4c6de7756d4ca9f3447b997693c3e0 100644 (file)
@@ -14,6 +14,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -24,6 +25,7 @@ import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -2079,6 +2081,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.transferLeadership(mockTransferCohort);
 
         verify(mockTransferCohort, never()).transferComplete();
+        doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
 
@@ -2109,6 +2112,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(followerActor);
 
         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
         leader.transferLeadership(mockTransferCohort);
 
         verify(mockTransferCohort, never()).transferComplete();
@@ -2140,6 +2144,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(followerActor);
 
         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
         leader.transferLeadership(mockTransferCohort);
 
         verify(mockTransferCohort, never()).transferComplete();
index 19a2151d2fc925284a39d65960e80819d425e154..39ee810eb0f3b29a0544e516b56d9e086dc5fb2d 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import akka.actor.Props;
+import akka.actor.Status;
 import akka.actor.Status.Failure;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
@@ -65,6 +66,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
+import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
 import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
@@ -80,6 +82,7 @@ import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
+import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
@@ -87,6 +90,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -339,12 +343,40 @@ public class Shard extends RaftActor {
             } else if (message instanceof PersistAbortTransactionPayload) {
                 final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
                 persistPayload(txId, AbortTransactionPayload.create(txId), true);
+            } else if (message instanceof MakeLeaderLocal) {
+                onMakeLeaderLocal();
             } else {
                 super.handleNonRaftCommand(message);
             }
         }
     }
 
+    private void onMakeLeaderLocal() {
+        LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
+        if (isLeader()) {
+            getSender().tell(new Status.Success(null), getSelf());
+            return;
+        }
+
+        final ActorSelection leader = getLeader();
+
+        if (leader == null) {
+            // Leader is not present. The cluster is most likely trying to
+            // elect a leader and we should let that run its normal course
+
+            // TODO we can wait for the election to complete and retry the
+            // request. We can also let the caller retry by sending a flag
+            // in the response indicating the request is "reTryable".
+            getSender().tell(new Failure(
+                    new LeadershipTransferFailedException("We cannot initiate leadership transfer to local node. "
+                            + "Currently there is no leader for " + persistenceId())),
+                    getSelf());
+            return;
+        }
+
+        leader.tell(new RequestLeadership(getId(), getSender()), getSelf());
+    }
+
     // Acquire our frontend tracking handle and verify generation matches
     private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
         final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MakeLeaderLocal.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MakeLeaderLocal.java
new file mode 100644 (file)
index 0000000..4e0c9ca
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * Message sent to local shard to try to gain shard leadership. Sender of this
+ * message will be notified about result of leadership transfer with
+ * {@link akka.actor.Status.Success}, if leadership is successfully transferred
+ * to local shard. Otherwise {@link akka.actor.Status.Failure} with
+ * {@link org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException}
+ * will be sent to sender of this message.
+ */
+public final class MakeLeaderLocal {
+
+    public static final MakeLeaderLocal INSTANCE = new MakeLeaderLocal();
+
+    private MakeLeaderLocal() {
+
+    }
+}