Bug 7407 - Add request leadership functionality to shards 42/53542/21
authorJakub Morvay <jmorvay@cisco.com>
Mon, 20 Mar 2017 08:58:11 +0000 (09:58 +0100)
committerTom Pantelis <tompantelis@gmail.com>
Sat, 1 Apr 2017 08:36:45 +0000 (08:36 +0000)
This adds a new MakeLeaderLocal message to Shard class API.
MakeLeaderLocal message is sent to a local shard replica to request
the shard leader to be moved to the local node. Local shard will
contact the current leader with RequestLeadership message to initiate
leadeship transfer to itself. Original sender of MakeLeaderLocal
message will be notified about result of this operation.

Change-Id: I2b0ee7caf772457e31250d1bdddd5fc77b16fc53
Signed-off-by: Jakub Morvay <jmorvay@cisco.com>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/LeadershipTransferFailedException.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestLeadership.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MakeLeaderLocal.java [new file with mode: 0644]

diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/LeadershipTransferFailedException.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/LeadershipTransferFailedException.java
new file mode 100644 (file)
index 0000000..ce4cfcd
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+public class LeadershipTransferFailedException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public LeadershipTransferFailedException(final String message) {
+        super(message);
+    }
+}
index 1c057d7..70a0b86 100644 (file)
@@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.raft;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
+import akka.actor.Status;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -49,6 +50,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
@@ -263,12 +265,53 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof Runnable) {
             ((Runnable)message).run();
         } else if (message instanceof NoopPayload) {
-            persistData(null, null, (NoopPayload)message, false);
+            persistData(null, null, (NoopPayload) message, false);
+        } else if (message instanceof RequestLeadership) {
+            onRequestLeadership((RequestLeadership) message);
         } else if (!possiblyHandleBehaviorMessage(message)) {
             handleNonRaftCommand(message);
         }
     }
 
+    private void onRequestLeadership(final RequestLeadership message) {
+        LOG.debug("{}: onRequestLeadership {}", persistenceId(), message);
+        if (!isLeader()) {
+            // non-leader cannot satisfy leadership request
+            LOG.warn("{}: onRequestLeadership {} was sent to non-leader."
+                    + " Current behavior: {}. Sending failure response",
+                    persistenceId(), getCurrentBehavior().state());
+            message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to "
+                    + message.getRequestedFollowerId()
+                    + ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf());
+            return;
+        }
+
+        final String requestedFollowerId = message.getRequestedFollowerId();
+        final ActorRef replyTo = message.getReplyTo();
+        initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+            @Override
+            public void onSuccess(final ActorRef raftActorRef) {
+                // sanity check
+                if (!requestedFollowerId.equals(getLeaderId())) {
+                    onFailure(raftActorRef);
+                }
+
+                LOG.debug("{}: Leadership transferred successfully to {}", persistenceId(), requestedFollowerId);
+                replyTo.tell(new Status.Success(null), getSelf());
+            }
+
+            @Override
+            public void onFailure(final ActorRef raftActorRef) {
+                LOG.debug("{}: LeadershipTransfer request from {} failed", persistenceId(), requestedFollowerId);
+                replyTo.tell(new Status.Failure(
+                        new LeadershipTransferFailedException(
+                                "Failed to transfer leadership to " + requestedFollowerId
+                                        + ". Follower is not ready to become leader")),
+                        getSelf());
+            }
+        }, message.getRequestedFollowerId());
+    }
+
     private boolean possiblyHandleBehaviorMessage(final Object message) {
         final RaftActorBehavior currentBehavior = getCurrentBehavior();
         final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
@@ -286,10 +329,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
+        initiateLeadershipTransfer(onComplete, null);
+    }
+
+    private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
+                                            final String followerId) {
         LOG.debug("{}: Initiating leader transfer", persistenceId());
 
         if (leadershipTransferInProgress == null) {
-            leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this);
+            leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
             leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
                 @Override
                 public void onSuccess(ActorRef raftActorRef) {
index 32790d0..59ae4d3 100644 (file)
@@ -16,6 +16,7 @@ import com.google.common.base.Stopwatch;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
@@ -53,15 +54,22 @@ import scala.concurrent.duration.FiniteDuration;
 public class RaftActorLeadershipTransferCohort {
     private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
 
-    private final RaftActor raftActor;
-    private Cancellable newLeaderTimer;
     private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
-    private long newLeaderTimeoutInMillis = 2000;
     private final Stopwatch transferTimer = Stopwatch.createUnstarted();
+    private final RaftActor raftActor;
+    private final String requestedFollowerId;
+
+    private long newLeaderTimeoutInMillis = 2000;
+    private Cancellable newLeaderTimer;
     private boolean isTransferring;
 
-    RaftActorLeadershipTransferCohort(RaftActor raftActor) {
+    RaftActorLeadershipTransferCohort(final RaftActor raftActor) {
+        this(raftActor, null);
+    }
+
+    RaftActorLeadershipTransferCohort(final RaftActor raftActor, @Nullable final String requestedFollowerId) {
         this.raftActor = raftActor;
+        this.requestedFollowerId = requestedFollowerId;
     }
 
     void init() {
@@ -144,7 +152,7 @@ public class RaftActorLeadershipTransferCohort {
             }, raftActor.getContext().system().dispatcher(), raftActor.self());
     }
 
-    void onNewLeader(String newLeader) {
+    void onNewLeader(final String newLeader) {
         if (newLeader != null && newLeaderTimer != null) {
             LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
             newLeaderTimer.cancel();
@@ -152,7 +160,7 @@ public class RaftActorLeadershipTransferCohort {
         }
     }
 
-    private void finish(boolean success) {
+    private void finish(final boolean success) {
         isTransferring = false;
         if (transferTimer.isRunning()) {
             transferTimer.stop();
@@ -173,7 +181,7 @@ public class RaftActorLeadershipTransferCohort {
         }
     }
 
-    void addOnComplete(OnComplete onComplete) {
+    void addOnComplete(final OnComplete onComplete) {
         onCompleteCallbacks.add(onComplete);
     }
 
@@ -182,10 +190,14 @@ public class RaftActorLeadershipTransferCohort {
     }
 
     @VisibleForTesting
-    void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) {
+    void setNewLeaderTimeoutInMillis(final long newLeaderTimeoutInMillis) {
         this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
     }
 
+    public Optional<String> getRequestedFollowerId() {
+        return Optional.fromNullable(requestedFollowerId);
+    }
+
     interface OnComplete {
         void onSuccess(ActorRef raftActorRef);
 
index f5f3f51..eb49abc 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
@@ -134,6 +135,13 @@ public class Leader extends AbstractLeader {
             return;
         }
 
+        final Optional<String> requestedFollowerIdOptional
+                = leadershipTransferContext.transferCohort.getRequestedFollowerId();
+        if (requestedFollowerIdOptional.isPresent() && !requestedFollowerIdOptional.get().equals(followerId)) {
+            // we want to transfer leadership to specific follower
+            return;
+        }
+
         FollowerLogInformation followerInfo = getFollower(followerId);
         if (followerInfo == null) {
             return;
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestLeadership.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestLeadership.java
new file mode 100644 (file)
index 0000000..5561085
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+
+/**
+ * Message sent to leader to transfer leadership to a particular follower.
+ */
+public final class RequestLeadership implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String requestedFollowerId;
+    private final ActorRef replyTo;
+
+    public RequestLeadership(final String requestedFollowerId, final ActorRef replyTo) {
+        this.requestedFollowerId = Preconditions.checkNotNull(requestedFollowerId);
+        this.replyTo = Preconditions.checkNotNull(replyTo);
+    }
+
+    public String getRequestedFollowerId() {
+        return requestedFollowerId;
+    }
+
+    public ActorRef getReplyTo() {
+        return replyTo;
+    }
+
+    @Override
+    public String toString() {
+        return "RequestLeadership [requestedFollowerId=" + requestedFollowerId + ", replyTo=" + replyTo + "]";
+    }
+}
index 83f5513..7ed4595 100644 (file)
@@ -8,12 +8,14 @@
 package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.actor.Status;
 import akka.pattern.Patterns;
 import akka.testkit.TestActorRef;
 import com.google.common.collect.ImmutableMap;
@@ -28,6 +30,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitionin
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -47,6 +50,7 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat
     private TestActorRef<MessageCollectorActor> follower3NotifierActor;
     private TestActorRef<TestRaftActor> follower3Actor;
     private ActorRef follower3CollectorActor;
+    private ActorRef requestLeadershipResultCollectorActor;
 
     @Test
     public void testLeaderTransferOnShutDown() throws Exception {
@@ -224,4 +228,78 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat
 
         testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending");
     }
+
+    private void sendFollower2RequestLeadershipTransferToLeader() {
+        testLog.info("sendFollower2RequestLeadershipTransferToLeader starting");
+
+        leaderActor.tell(
+                new RequestLeadership(follower2Id, requestLeadershipResultCollectorActor), ActorRef.noSender());
+
+        testLog.info("sendFollower2RequestLeadershipTransferToLeader ending");
+    }
+
+    private void createRequestLeadershipResultCollectorActor() {
+        testLog.info("createRequestLeadershipResultCollectorActor starting");
+
+        requestLeadershipResultCollectorActor = factory.createActor(MessageCollectorActor.props());
+
+        testLog.info("createRequestLeadershipResultCollectorActor ending");
+    }
+
+    @Test
+    public void testSuccessfulRequestLeadershipTransferToFollower2() {
+        testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 starting");
+
+        createRaftActors();
+        createRequestLeadershipResultCollectorActor();
+
+        sendFollower2RequestLeadershipTransferToLeader();
+
+        verifyRaftState(follower2Actor, RaftState.Leader);
+
+        expectMatching(requestLeadershipResultCollectorActor, Status.Success.class, 1);
+
+        testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 ending");
+    }
+
+    @Test
+    public void testRequestLeadershipTransferToFollower2WithFollower2Lagging() {
+        testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging starting");
+
+        createRaftActors();
+        createRequestLeadershipResultCollectorActor();
+
+        sendPayloadWithFollower2Lagging();
+
+        sendFollower2RequestLeadershipTransferToLeader();
+
+        verifyRaftState(follower1Actor, RaftState.Follower);
+        verifyRaftState(follower2Actor, RaftState.Follower);
+        verifyRaftState(follower3Actor, RaftState.Follower);
+
+        Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
+        assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
+
+        testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging ending");
+    }
+
+    @Test
+    public void testRequestLeadershipTransferToFollower2WithFollower2Shutdown() throws Exception {
+        testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown starting");
+
+        createRaftActors();
+        createRequestLeadershipResultCollectorActor();
+
+        sendShutDown(follower2Actor);
+
+        sendFollower2RequestLeadershipTransferToLeader();
+
+        verifyRaftState(follower1Actor, RaftState.Follower);
+        verifyRaftState(follower3Actor, RaftState.Follower);
+
+        Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
+        assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
+
+        testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown ending");
+    }
 }
index c83a1a1..76322ee 100644 (file)
@@ -14,6 +14,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -24,6 +25,7 @@ import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -2079,6 +2081,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.transferLeadership(mockTransferCohort);
 
         verify(mockTransferCohort, never()).transferComplete();
+        doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
 
@@ -2109,6 +2112,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(followerActor);
 
         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
         leader.transferLeadership(mockTransferCohort);
 
         verify(mockTransferCohort, never()).transferComplete();
@@ -2140,6 +2144,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(followerActor);
 
         RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
         leader.transferLeadership(mockTransferCohort);
 
         verify(mockTransferCohort, never()).transferComplete();
index 19a2151..39ee810 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import akka.actor.Props;
+import akka.actor.Status;
 import akka.actor.Status.Failure;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
@@ -65,6 +66,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
+import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
 import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
@@ -80,6 +82,7 @@ import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
+import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
@@ -87,6 +90,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -339,12 +343,40 @@ public class Shard extends RaftActor {
             } else if (message instanceof PersistAbortTransactionPayload) {
                 final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
                 persistPayload(txId, AbortTransactionPayload.create(txId), true);
+            } else if (message instanceof MakeLeaderLocal) {
+                onMakeLeaderLocal();
             } else {
                 super.handleNonRaftCommand(message);
             }
         }
     }
 
+    private void onMakeLeaderLocal() {
+        LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
+        if (isLeader()) {
+            getSender().tell(new Status.Success(null), getSelf());
+            return;
+        }
+
+        final ActorSelection leader = getLeader();
+
+        if (leader == null) {
+            // Leader is not present. The cluster is most likely trying to
+            // elect a leader and we should let that run its normal course
+
+            // TODO we can wait for the election to complete and retry the
+            // request. We can also let the caller retry by sending a flag
+            // in the response indicating the request is "reTryable".
+            getSender().tell(new Failure(
+                    new LeadershipTransferFailedException("We cannot initiate leadership transfer to local node. "
+                            + "Currently there is no leader for " + persistenceId())),
+                    getSelf());
+            return;
+        }
+
+        leader.tell(new RequestLeadership(getId(), getSender()), getSelf());
+    }
+
     // Acquire our frontend tracking handle and verify generation matches
     private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
         final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MakeLeaderLocal.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MakeLeaderLocal.java
new file mode 100644 (file)
index 0000000..4e0c9ca
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * Message sent to local shard to try to gain shard leadership. Sender of this
+ * message will be notified about result of leadership transfer with
+ * {@link akka.actor.Status.Success}, if leadership is successfully transferred
+ * to local shard. Otherwise {@link akka.actor.Status.Failure} with
+ * {@link org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException}
+ * will be sent to sender of this message.
+ */
+public final class MakeLeaderLocal {
+
+    public static final MakeLeaderLocal INSTANCE = new MakeLeaderLocal();
+
+    private MakeLeaderLocal() {
+
+    }
+}