X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-cluster-admin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fadmin%2FClusterAdminRpcService.java;h=6102db5348c41c519ac0d4c7d1e54f0ffecb3f45;hp=515bd9d9b7662530598ff8c519349121809aa86a;hb=4b59df006c79ffb8119152e5a8bc6aadd276c031;hpb=14c92df74247c884a43c5aaea2f154992b0ec798 diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java index 515bd9d9b7..6102db5348 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -8,12 +8,15 @@ package org.opendaylight.controller.cluster.datastore.admin; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.Status.Success; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Strings; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -29,18 +32,24 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.commons.lang3.SerializationUtils; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus; +import org.opendaylight.controller.cluster.datastore.messages.GetKnownClients; +import org.opendaylight.controller.cluster.datastore.messages.GetKnownClientsReply; import org.opendaylight.controller.cluster.datastore.messages.GetShardRole; import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply; import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; @@ -72,12 +81,18 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutputBuilder; @@ -90,16 +105,25 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.ShardResult1; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.ShardResult1Builder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.shard.result.KnownClients; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.shard.result.KnownClientsBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LeaderActorRefBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LocalBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultKey; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.Empty; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; /** * Implements the yang RPCs defined in the generated ClusterAdminService interface. @@ -110,6 +134,11 @@ public class ClusterAdminRpcService implements ClusterAdminService { private static final Timeout SHARD_MGR_TIMEOUT = new Timeout(1, TimeUnit.MINUTES); private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class); + private static final @NonNull RpcResult LOCAL_SHARD_RESULT = + RpcResultBuilder.success(new LocateShardOutputBuilder() + .setMemberNode(new LocalBuilder().setLocal(Empty.getInstance()).build()) + .build()) + .build(); private final DistributedDataStoreInterface configDataStore; private final DistributedDataStoreInterface operDataStore; @@ -201,6 +230,48 @@ public class ClusterAdminRpcService implements ClusterAdminService { return returnFuture; } + @Override + public ListenableFuture> locateShard(final LocateShardInput input) { + final ActorUtils utils; + switch (input.getDataStoreType()) { + case Config: + utils = configDataStore.getActorUtils(); + break; + case Operational: + utils = operDataStore.getActorUtils(); + break; + default: + return newFailedRpcResultFuture("Unhandled datastore in " + input); + } + + final SettableFuture> ret = SettableFuture.create(); + utils.findPrimaryShardAsync(input.getShardName()).onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final PrimaryShardInfo success) throws Throwable { + if (failure != null) { + LOG.debug("Failed to find shard for {}", input, failure); + ret.setException(failure); + return; + } + + // Data tree implies local leak + if (success.getLocalShardDataTree().isPresent()) { + ret.set(LOCAL_SHARD_RESULT); + return; + } + + final ActorSelection actorPath = success.getPrimaryShardActor(); + ret.set(newSuccessfulResult(new LocateShardOutputBuilder() + .setMemberNode(new LeaderActorRefBuilder() + .setLeaderActorRef(actorPath.toSerializationFormat()) + .build()) + .build())); + } + }, utils.getClientDispatcher()); + + return ret; + } + @Override public ListenableFuture> makeLeaderLocal(final MakeLeaderLocalInput input) { final String shardName = input.getShardName(); @@ -219,8 +290,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { LOG.info("Moving leader to local node {} for shard {}, datastoreType {}", actorUtils.getCurrentMemberName().getName(), shardName, dataStoreType); - final scala.concurrent.Future localShardReply = - actorUtils.findLocalShardAsync(shardName); + final Future localShardReply = actorUtils.findLocalShardAsync(shardName); final scala.concurrent.Promise makeLeaderLocalAsk = akka.dispatch.Futures.promise(); localShardReply.onComplete(new OnComplete() { @@ -239,7 +309,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { }, actorUtils.getClientDispatcher()); final SettableFuture> future = SettableFuture.create(); - makeLeaderLocalAsk.future().onComplete(new OnComplete() { + makeLeaderLocalAsk.future().onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object success) { if (failure != null) { @@ -570,6 +640,58 @@ public class ClusterAdminRpcService implements ClusterAdminService { return returnFuture; } + + @Override + public ListenableFuture> getKnownClientsForAllShards( + final GetKnownClientsForAllShardsInput input) { + final ImmutableMap> allShardReplies = + getAllShardLeadersClients(); + return Futures.whenAllComplete(allShardReplies.values()).call(() -> processReplies(allShardReplies), + MoreExecutors.directExecutor()); + } + + private static RpcResult processReplies( + final ImmutableMap> allShardReplies) { + final Map result = Maps.newHashMapWithExpectedSize(allShardReplies.size()); + for (Entry> entry : allShardReplies.entrySet()) { + final ListenableFuture future = entry.getValue(); + final ShardResultBuilder builder = new ShardResultBuilder() + .setDataStoreType(entry.getKey().getDataStoreType()) + .setShardName(entry.getKey().getShardName()); + + final GetKnownClientsReply reply; + try { + reply = Futures.getDone(future); + } catch (ExecutionException e) { + LOG.debug("Shard {} failed to answer", entry.getKey(), e); + final ShardResult sr = builder + .setSucceeded(Boolean.FALSE) + .setErrorMessage(e.getCause().getMessage()) + .build(); + result.put(sr.key(), sr); + continue; + } + + final ShardResult sr = builder + .setSucceeded(Boolean.TRUE) + .addAugmentation(ShardResult1.class, new ShardResult1Builder() + .setKnownClients(reply.getClients().stream() + .map(client -> new KnownClientsBuilder() + .setMember(client.getFrontendId().getMemberName().toYang()) + .setType(client.getFrontendId().getClientType().toYang()) + .setGeneration(client.getYangGeneration()) + .build()) + .collect(Collectors.toMap(KnownClients::key, Function.identity()))) + .build()) + .build(); + + result.put(sr.key(), sr); + } + + return RpcResultBuilder.success(new GetKnownClientsForAllShardsOutputBuilder().setShardResult(result).build()) + .build(); + } + private static ChangeShardMembersVotingStatus toChangeShardMembersVotingStatus(final String shardName, final List memberVotingStatus) { Map serverVotingStatusMap = new HashMap<>(); @@ -581,19 +703,20 @@ public class ClusterAdminRpcService implements ClusterAdminService { private static SettableFuture> waitForShardResults( final List, ShardResultBuilder>> shardResultData, - final Function, T> resultDataSupplier, + final Function, T> resultDataSupplier, final String failureLogMsgPrefix) { final SettableFuture> returnFuture = SettableFuture.create(); - final List shardResults = new ArrayList<>(); + final Map shardResults = new HashMap<>(); for (final Entry, ShardResultBuilder> entry : shardResultData) { Futures.addCallback(entry.getKey(), new FutureCallback() { @Override public void onSuccess(final Success result) { synchronized (shardResults) { - ShardResultBuilder shardResult = entry.getValue(); - LOG.debug("onSuccess for shard {}, type {}", shardResult.getShardName(), - shardResult.getDataStoreType()); - shardResults.add(shardResult.setSucceeded(true).build()); + final ShardResultBuilder builder = entry.getValue(); + LOG.debug("onSuccess for shard {}, type {}", builder.getShardName(), + builder.getDataStoreType()); + final ShardResult sr = builder.setSucceeded(Boolean.TRUE).build(); + shardResults.put(sr.key(), sr); checkIfComplete(); } } @@ -601,11 +724,14 @@ public class ClusterAdminRpcService implements ClusterAdminService { @Override public void onFailure(final Throwable failure) { synchronized (shardResults) { - ShardResultBuilder shardResult = entry.getValue(); - LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, shardResult.getShardName(), - shardResult.getDataStoreType(), failure); - shardResults.add(shardResult.setSucceeded(false).setErrorMessage( - Throwables.getRootCause(failure).getMessage()).build()); + ShardResultBuilder builder = entry.getValue(); + LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, builder.getShardName(), + builder.getDataStoreType(), failure); + final ShardResult sr = builder + .setSucceeded(Boolean.FALSE) + .setErrorMessage(Throwables.getRootCause(failure).getMessage()) + .build(); + shardResults.put(sr.key(), sr); checkIfComplete(); } } @@ -685,7 +811,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { final SettableFuture returnFuture = SettableFuture.create(); @SuppressWarnings("unchecked") - scala.concurrent.Future askFuture = (scala.concurrent.Future) Patterns.ask(actor, message, timeout); + Future askFuture = (Future) Patterns.ask(actor, message, timeout); askFuture.onComplete(new OnComplete() { @Override public void onComplete(final Throwable failure, final T resp) { @@ -700,6 +826,38 @@ public class ClusterAdminRpcService implements ClusterAdminService { return returnFuture; } + private ImmutableMap> getAllShardLeadersClients() { + final ImmutableMap.Builder> builder = + ImmutableMap.builder(); + + addAllShardsClients(builder, DataStoreType.Config, configDataStore.getActorUtils()); + addAllShardsClients(builder, DataStoreType.Operational, operDataStore.getActorUtils()); + + return builder.build(); + } + + private static void addAllShardsClients( + final ImmutableMap.Builder> builder, + final DataStoreType type, final ActorUtils utils) { + for (String shardName : utils.getConfiguration().getAllShardNames()) { + final SettableFuture future = SettableFuture.create(); + builder.put(new ShardIdentifier(type, shardName), future); + + utils.findPrimaryShardAsync(shardName).flatMap( + info -> Patterns.ask(info.getPrimaryShardActor(), GetKnownClients.INSTANCE, SHARD_MGR_TIMEOUT), + utils.getClientDispatcher()).onComplete(new OnComplete<>() { + @Override + public void onComplete(final Throwable failure, final Object success) { + if (failure == null) { + future.set((GetKnownClientsReply) success); + } else { + future.setException(failure); + } + } + }, utils.getClientDispatcher()); + } + } + private static ListenableFuture> newFailedRpcResultFuture(final String message) { return ClusterAdminRpcService.newFailedRpcResultBuilder(message).buildFuture(); }