Bug 7805: Add make-leader-local rpc for module based shard.
[controller.git] / opendaylight / md-sal / sal-cluster-admin-impl / src / main / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcService.java
index 3627bd80facea719dddddcd61065218efff1d2d7..ab85331c14d57a052a5c84e92713b89a8adaffb1 100644 (file)
@@ -37,6 +37,7 @@ import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardRepl
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
 import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
@@ -57,6 +58,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder;
@@ -86,6 +88,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
     private final DistributedDataStoreInterface configDataStore;
     private final DistributedDataStoreInterface operDataStore;
     private final BindingNormalizedNodeSerializer serializer;
+    private final Timeout makeLeaderLocalTimeout;
 
     public ClusterAdminRpcService(DistributedDataStoreInterface configDataStore,
             DistributedDataStoreInterface operDataStore,
@@ -93,6 +96,10 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         this.configDataStore = configDataStore;
         this.operDataStore = operDataStore;
         this.serializer = serializer;
+
+        this.makeLeaderLocalTimeout =
+                new Timeout(configDataStore.getActorContext().getDatastoreContext()
+                        .getShardLeaderElectionTimeout().duration().$times(2));
     }
 
     @Override
@@ -167,6 +174,62 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         return returnFuture;
     }
 
+    @Override
+    public Future<RpcResult<Void>> makeLeaderLocal(final MakeLeaderLocalInput input) {
+        final String shardName = input.getShardName();
+        if (Strings.isNullOrEmpty(shardName)) {
+            return newFailedRpcResultFuture("A valid shard name must be specified");
+        }
+
+        DataStoreType dataStoreType = input.getDataStoreType();
+        if (dataStoreType == null) {
+            return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+        }
+
+        LOG.info("Moving leader to local node for shard {}, datastoreType {}", shardName, dataStoreType);
+
+        ActorContext actorContext = dataStoreType == DataStoreType.Config
+                ? configDataStore.getActorContext()
+                : operDataStore.getActorContext();
+
+        final scala.concurrent.Future<ActorRef> localShardReply =
+                actorContext.findLocalShardAsync(shardName);
+
+        final scala.concurrent.Promise<Object> makeLeaderLocalAsk = akka.dispatch.Futures.promise();
+        localShardReply.onComplete(new OnComplete<ActorRef>() {
+            @Override
+            public void onComplete(final Throwable failure, final ActorRef actorRef) throws Throwable {
+                if (failure != null) {
+                    LOG.warn("No local shard found for {} datastoreType {} - Cannot request leadership transfer to"
+                                    + " local shard.", shardName, failure);
+                    makeLeaderLocalAsk.failure(failure);
+                } else {
+                    makeLeaderLocalAsk
+                            .completeWith(actorContext
+                                    .executeOperationAsync(actorRef, MakeLeaderLocal.INSTANCE, makeLeaderLocalTimeout));
+                }
+            }
+        }, actorContext.getClientDispatcher());
+
+        final SettableFuture<RpcResult<Void>> future = SettableFuture.create();
+        makeLeaderLocalAsk.future().onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object success) throws Throwable {
+                if (failure != null) {
+                    LOG.error("Leadership transfer failed for shard {}.", shardName, failure);
+                    future.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION,
+                            "leadership transfer failed", failure).build());
+                    return;
+                }
+
+                LOG.debug("Leadership transfer complete {}.", success);
+                future.set(RpcResultBuilder.<Void>success().build());
+            }
+        }, actorContext.getClientDispatcher());
+
+        return future;
+    }
+
     @Override
     public Future<RpcResult<Void>> addPrefixShardReplica(final AddPrefixShardReplicaInput input) {