Add locate-shard RPC
[controller.git] / opendaylight / md-sal / sal-cluster-admin-impl / src / main / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcService.java
index 427d28585f650c4cb67d452481ff12958070bce2..de7709938f623ff5bf4736b637707b085bd06a53 100644 (file)
@@ -8,11 +8,11 @@
 package org.opendaylight.controller.cluster.datastore.admin;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
 import akka.actor.Status.Success;
 import akka.dispatch.OnComplete;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
-import com.google.common.base.Function;
 import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.FutureCallback;
@@ -20,6 +20,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.AbstractMap.SimpleEntry;
@@ -30,7 +31,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import org.apache.commons.lang3.SerializationUtils;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
 import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
@@ -40,11 +43,12 @@ import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVo
 import org.opendaylight.controller.cluster.datastore.messages.GetShardRole;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply;
 import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
@@ -77,6 +81,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutputBuilder;
@@ -89,10 +96,13 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LeaderActorRefBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LocalBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.Empty;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
@@ -109,6 +119,11 @@ public class ClusterAdminRpcService implements ClusterAdminService {
     private static final Timeout SHARD_MGR_TIMEOUT = new Timeout(1, TimeUnit.MINUTES);
 
     private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class);
+    private static final @NonNull RpcResult<LocateShardOutput> LOCAL_SHARD_RESULT =
+            RpcResultBuilder.success(new LocateShardOutputBuilder()
+                .setMemberNode(new LocalBuilder().setLocal(Empty.getInstance()).build())
+                .build())
+            .build();
 
     private final DistributedDataStoreInterface configDataStore;
     private final DistributedDataStoreInterface operDataStore;
@@ -123,7 +138,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         this.serializer = serializer;
 
         this.makeLeaderLocalTimeout =
-                new Timeout(configDataStore.getActorContext().getDatastoreContext()
+                new Timeout(configDataStore.getActorUtils().getDatastoreContext()
                         .getShardLeaderElectionTimeout().duration().$times(2));
     }
 
@@ -200,6 +215,48 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         return returnFuture;
     }
 
+    @Override
+    public ListenableFuture<RpcResult<LocateShardOutput>> locateShard(final LocateShardInput input) {
+        final ActorUtils utils;
+        switch (input.getDataStoreType()) {
+            case Config:
+                utils = configDataStore.getActorUtils();
+                break;
+            case Operational:
+                utils = operDataStore.getActorUtils();
+                break;
+            default:
+                return newFailedRpcResultFuture("Unhandled datastore in " + input);
+        }
+
+        final SettableFuture<RpcResult<LocateShardOutput>> ret = SettableFuture.create();
+        utils.findPrimaryShardAsync(input.getShardName()).onComplete(new OnComplete<PrimaryShardInfo>() {
+            @Override
+            public void onComplete(final Throwable failure, final PrimaryShardInfo success) throws Throwable {
+                if (failure != null) {
+                    LOG.debug("Failed to find shard for {}", input, failure);
+                    ret.setException(failure);
+                    return;
+                }
+
+                // Data tree implies local leak
+                if (success.getLocalShardDataTree().isPresent()) {
+                    ret.set(LOCAL_SHARD_RESULT);
+                    return;
+                }
+
+                final ActorSelection actorPath = success.getPrimaryShardActor();
+                ret.set(newSuccessfulResult(new LocateShardOutputBuilder()
+                    .setMemberNode(new LeaderActorRefBuilder()
+                        .setLeaderActorRef(actorPath.toSerializationFormat())
+                        .build())
+                    .build()));
+            }
+        }, utils.getClientDispatcher());
+
+        return ret;
+    }
+
     @Override
     public ListenableFuture<RpcResult<MakeLeaderLocalOutput>> makeLeaderLocal(final MakeLeaderLocalInput input) {
         final String shardName = input.getShardName();
@@ -212,36 +269,35 @@ public class ClusterAdminRpcService implements ClusterAdminService {
             return newFailedRpcResultFuture("A valid DataStoreType must be specified");
         }
 
-        ActorContext actorContext = dataStoreType == DataStoreType.Config
-                ? configDataStore.getActorContext()
-                : operDataStore.getActorContext();
+        ActorUtils actorUtils = dataStoreType == DataStoreType.Config
+                ? configDataStore.getActorUtils() : operDataStore.getActorUtils();
 
         LOG.info("Moving leader to local node {} for shard {}, datastoreType {}",
-                actorContext.getCurrentMemberName().getName(), shardName, dataStoreType);
+                actorUtils.getCurrentMemberName().getName(), shardName, dataStoreType);
 
         final scala.concurrent.Future<ActorRef> localShardReply =
-                actorContext.findLocalShardAsync(shardName);
+                actorUtils.findLocalShardAsync(shardName);
 
         final scala.concurrent.Promise<Object> makeLeaderLocalAsk = akka.dispatch.Futures.promise();
         localShardReply.onComplete(new OnComplete<ActorRef>() {
             @Override
-            public void onComplete(final Throwable failure, final ActorRef actorRef) throws Throwable {
+            public void onComplete(final Throwable failure, final ActorRef actorRef) {
                 if (failure != null) {
                     LOG.warn("No local shard found for {} datastoreType {} - Cannot request leadership transfer to"
-                                    + " local shard.", shardName, failure);
+                            + " local shard.", shardName, dataStoreType, failure);
                     makeLeaderLocalAsk.failure(failure);
                 } else {
                     makeLeaderLocalAsk
-                            .completeWith(actorContext
+                            .completeWith(actorUtils
                                     .executeOperationAsync(actorRef, MakeLeaderLocal.INSTANCE, makeLeaderLocalTimeout));
                 }
             }
-        }, actorContext.getClientDispatcher());
+        }, actorUtils.getClientDispatcher());
 
         final SettableFuture<RpcResult<MakeLeaderLocalOutput>> future = SettableFuture.create();
         makeLeaderLocalAsk.future().onComplete(new OnComplete<Object>() {
             @Override
-            public void onComplete(final Throwable failure, final Object success) throws Throwable {
+            public void onComplete(final Throwable failure, final Object success) {
                 if (failure != null) {
                     LOG.error("Leadership transfer failed for shard {}.", shardName, failure);
                     future.set(RpcResultBuilder.<MakeLeaderLocalOutput>failed().withError(ErrorType.APPLICATION,
@@ -252,7 +308,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
                 LOG.debug("Leadership transfer complete");
                 future.set(RpcResultBuilder.success(new MakeLeaderLocalOutputBuilder().build()).build());
             }
-        }, actorContext.getClientDispatcher());
+        }, actorUtils.getClientDispatcher());
 
         return future;
     }
@@ -342,10 +398,9 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         LOG.info("Adding replicas for all shards");
 
         final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
-        Function<String, Object> messageSupplier = AddShardReplica::new;
 
-        sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
-        sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
+        sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, AddShardReplica::new);
+        sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, AddShardReplica::new);
 
         return waitForShardResults(shardResultData, shardResults ->
                 new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build(),
@@ -625,14 +680,14 @@ public class ClusterAdminRpcService implements ClusterAdminService {
     private <T> void sendMessageToManagerForConfiguredShards(final DataStoreType dataStoreType,
             final List<Entry<ListenableFuture<T>, ShardResultBuilder>> shardResultData,
             final Function<String, Object> messageSupplier) {
-        ActorContext actorContext = dataStoreType == DataStoreType.Config ? configDataStore.getActorContext()
-                : operDataStore.getActorContext();
-        Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
+        ActorUtils actorUtils = dataStoreType == DataStoreType.Config ? configDataStore.getActorUtils()
+                : operDataStore.getActorUtils();
+        Set<String> allShardNames = actorUtils.getConfiguration().getAllShardNames();
 
-        LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorContext.getDataStoreName());
+        LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorUtils.getDataStoreName());
 
         for (String shardName: allShardNames) {
-            ListenableFuture<T> future = this.ask(actorContext.getShardManager(), messageSupplier.apply(shardName),
+            ListenableFuture<T> future = this.ask(actorUtils.getShardManager(), messageSupplier.apply(shardName),
                                                   SHARD_MGR_TIMEOUT);
             shardResultData.add(new SimpleEntry<>(future,
                     new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType)));
@@ -641,16 +696,16 @@ public class ClusterAdminRpcService implements ClusterAdminService {
 
     private <T> ListenableFuture<List<T>> sendMessageToShardManagers(final Object message) {
         Timeout timeout = SHARD_MGR_TIMEOUT;
-        ListenableFuture<T> configFuture = ask(configDataStore.getActorContext().getShardManager(), message, timeout);
-        ListenableFuture<T> operFuture = ask(operDataStore.getActorContext().getShardManager(), message, timeout);
+        ListenableFuture<T> configFuture = ask(configDataStore.getActorUtils().getShardManager(), message, timeout);
+        ListenableFuture<T> operFuture = ask(operDataStore.getActorUtils().getShardManager(), message, timeout);
 
         return Futures.allAsList(configFuture, operFuture);
     }
 
     private <T> ListenableFuture<T> sendMessageToShardManager(final DataStoreType dataStoreType, final Object message) {
         ActorRef shardManager = dataStoreType == DataStoreType.Config
-                ? configDataStore.getActorContext().getShardManager()
-                        : operDataStore.getActorContext().getShardManager();
+                ? configDataStore.getActorUtils().getShardManager()
+                        : operDataStore.getActorUtils().getShardManager();
         return ask(shardManager, message, SHARD_MGR_TIMEOUT);
     }
 
@@ -672,9 +727,10 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         onMessageFailure(String.format("Failed to back up datastore to file %s", fileName), returnFuture, failure);
     }
 
+    @SuppressFBWarnings("SLF4J_SIGN_ONLY_FORMAT")
     private static <T> void onMessageFailure(final String msg, final SettableFuture<RpcResult<T>> returnFuture,
             final Throwable failure) {
-        LOG.error(msg, failure);
+        LOG.error("{}", msg, failure);
         returnFuture.set(ClusterAdminRpcService.<T>newFailedRpcResultBuilder(String.format("%s: %s", msg,
                 failure.getMessage())).build());
     }
@@ -693,7 +749,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
                     returnFuture.set(resp);
                 }
             }
-        }, configDataStore.getActorContext().getClientDispatcher());
+        }, configDataStore.getActorUtils().getClientDispatcher());
 
         return returnFuture;
     }