X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-cluster-admin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fadmin%2FClusterAdminRpcService.java;h=de7709938f623ff5bf4736b637707b085bd06a53;hp=4012f2c70914c618c705a92a340ef99dc448fef6;hb=f176c27a04a39be8d4a823254c8b0e924598262e;hpb=f83b2d36fdd7e953ba72492ffb684cd112aa04a6 diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java index 4012f2c709..de7709938f 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -8,11 +8,11 @@ package org.opendaylight.controller.cluster.datastore.admin; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.Status.Success; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; -import com.google.common.base.Function; import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.util.concurrent.FutureCallback; @@ -31,7 +31,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.apache.commons.lang3.SerializationUtils; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica; @@ -41,6 +43,7 @@ import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVo import org.opendaylight.controller.cluster.datastore.messages.GetShardRole; import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply; import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; @@ -78,6 +81,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutputBuilder; @@ -90,10 +96,13 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LeaderActorRefBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LocalBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.Empty; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; @@ -110,6 +119,11 @@ public class ClusterAdminRpcService implements ClusterAdminService { private static final Timeout SHARD_MGR_TIMEOUT = new Timeout(1, TimeUnit.MINUTES); private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class); + private static final @NonNull RpcResult LOCAL_SHARD_RESULT = + RpcResultBuilder.success(new LocateShardOutputBuilder() + .setMemberNode(new LocalBuilder().setLocal(Empty.getInstance()).build()) + .build()) + .build(); private final DistributedDataStoreInterface configDataStore; private final DistributedDataStoreInterface operDataStore; @@ -201,6 +215,48 @@ public class ClusterAdminRpcService implements ClusterAdminService { return returnFuture; } + @Override + public ListenableFuture> locateShard(final LocateShardInput input) { + final ActorUtils utils; + switch (input.getDataStoreType()) { + case Config: + utils = configDataStore.getActorUtils(); + break; + case Operational: + utils = operDataStore.getActorUtils(); + break; + default: + return newFailedRpcResultFuture("Unhandled datastore in " + input); + } + + final SettableFuture> ret = SettableFuture.create(); + utils.findPrimaryShardAsync(input.getShardName()).onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final PrimaryShardInfo success) throws Throwable { + if (failure != null) { + LOG.debug("Failed to find shard for {}", input, failure); + ret.setException(failure); + return; + } + + // Data tree implies local leak + if (success.getLocalShardDataTree().isPresent()) { + ret.set(LOCAL_SHARD_RESULT); + return; + } + + final ActorSelection actorPath = success.getPrimaryShardActor(); + ret.set(newSuccessfulResult(new LocateShardOutputBuilder() + .setMemberNode(new LeaderActorRefBuilder() + .setLeaderActorRef(actorPath.toSerializationFormat()) + .build()) + .build())); + } + }, utils.getClientDispatcher()); + + return ret; + } + @Override public ListenableFuture> makeLeaderLocal(final MakeLeaderLocalInput input) { final String shardName = input.getShardName(); @@ -342,10 +398,9 @@ public class ClusterAdminRpcService implements ClusterAdminService { LOG.info("Adding replicas for all shards"); final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); - Function messageSupplier = AddShardReplica::new; - sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier); - sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier); + sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, AddShardReplica::new); + sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, AddShardReplica::new); return waitForShardResults(shardResultData, shardResults -> new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build(),