X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-cluster-admin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fadmin%2FClusterAdminRpcService.java;h=1e2ee442774e2d731467686e8192d405de270507;hp=dcf10c9b39ec8c5df1eb6a2888a415604f5e1fb0;hb=10606a33e2c34bd15076bd605cf9c07261e760e4;hpb=8cf5612a0bf4667c3a852175ca3d9433ee3abdcf diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java index dcf10c9b39..1e2ee44277 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -8,17 +8,21 @@ package org.opendaylight.controller.cluster.datastore.admin; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.Status.Success; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; -import com.google.common.base.Function; import com.google.common.base.Strings; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.FileOutputStream; import java.io.IOException; import java.util.AbstractMap.SimpleEntry; @@ -28,61 +32,89 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.Future; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.commons.lang3.SerializationUtils; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; -import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus; +import org.opendaylight.controller.cluster.datastore.messages.GetKnownClients; +import org.opendaylight.controller.cluster.datastore.messages.GetKnownClientsReply; import org.opendaylight.controller.cluster.datastore.messages.GetShardRole; import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply; import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal; -import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.controller.eos.akka.DataCenterControl; import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ActivateEosDatacenterInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ActivateEosDatacenterOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DeactivateEosDatacenterInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DeactivateEosDatacenterOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.ShardResult1Builder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.shard.result.KnownClients; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.shard.result.KnownClientsBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LeaderActorRefBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LocalBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultKey; +import org.opendaylight.yangtools.yang.common.Empty; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.common.Uint32; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; /** * Implements the yang RPCs defined in the generated ClusterAdminService interface. @@ -93,26 +125,35 @@ public class ClusterAdminRpcService implements ClusterAdminService { private static final Timeout SHARD_MGR_TIMEOUT = new Timeout(1, TimeUnit.MINUTES); private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class); + private static final @NonNull RpcResult LOCAL_SHARD_RESULT = + RpcResultBuilder.success(new LocateShardOutputBuilder() + .setMemberNode(new LocalBuilder().setLocal(Empty.value()).build()) + .build()) + .build(); private final DistributedDataStoreInterface configDataStore; private final DistributedDataStoreInterface operDataStore; private final BindingNormalizedNodeSerializer serializer; private final Timeout makeLeaderLocalTimeout; + private final DataCenterControl dataCenterControl; - public ClusterAdminRpcService(DistributedDataStoreInterface configDataStore, - DistributedDataStoreInterface operDataStore, - BindingNormalizedNodeSerializer serializer) { + public ClusterAdminRpcService(final DistributedDataStoreInterface configDataStore, + final DistributedDataStoreInterface operDataStore, + final BindingNormalizedNodeSerializer serializer, + final DataCenterControl dataCenterControl) { this.configDataStore = configDataStore; this.operDataStore = operDataStore; this.serializer = serializer; this.makeLeaderLocalTimeout = - new Timeout(configDataStore.getActorContext().getDatastoreContext() + new Timeout(configDataStore.getActorUtils().getDatastoreContext() .getShardLeaderElectionTimeout().duration().$times(2)); + + this.dataCenterControl = dataCenterControl; } @Override - public Future> addShardReplica(final AddShardReplicaInput input) { + public ListenableFuture> addShardReplica(final AddShardReplicaInput input) { final String shardName = input.getShardName(); if (Strings.isNullOrEmpty(shardName)) { return newFailedRpcResultFuture("A valid shard name must be specified"); @@ -125,27 +166,28 @@ public class ClusterAdminRpcService implements ClusterAdminService { LOG.info("Adding replica for shard {}", shardName); - final SettableFuture> returnFuture = SettableFuture.create(); + final SettableFuture> returnFuture = SettableFuture.create(); ListenableFuture future = sendMessageToShardManager(dataStoreType, new AddShardReplica(shardName)); Futures.addCallback(future, new FutureCallback() { @Override - public void onSuccess(Success success) { + public void onSuccess(final Success success) { LOG.info("Successfully added replica for shard {}", shardName); - returnFuture.set(newSuccessfulResult()); + returnFuture.set(newSuccessfulResult(new AddShardReplicaOutputBuilder().build())); } @Override - public void onFailure(Throwable failure) { + public void onFailure(final Throwable failure) { onMessageFailure(String.format("Failed to add replica for shard %s", shardName), returnFuture, failure); } - }); + }, MoreExecutors.directExecutor()); return returnFuture; } @Override - public Future> removeShardReplica(RemoveShardReplicaInput input) { + public ListenableFuture> removeShardReplica( + final RemoveShardReplicaInput input) { final String shardName = input.getShardName(); if (Strings.isNullOrEmpty(shardName)) { return newFailedRpcResultFuture("A valid shard name must be specified"); @@ -163,28 +205,70 @@ public class ClusterAdminRpcService implements ClusterAdminService { LOG.info("Removing replica for shard {} memberName {}, datastoreType {}", shardName, memberName, dataStoreType); - final SettableFuture> returnFuture = SettableFuture.create(); + final SettableFuture> returnFuture = SettableFuture.create(); ListenableFuture future = sendMessageToShardManager(dataStoreType, new RemoveShardReplica(shardName, MemberName.forName(memberName))); Futures.addCallback(future, new FutureCallback() { @Override - public void onSuccess(Success success) { + public void onSuccess(final Success success) { LOG.info("Successfully removed replica for shard {}", shardName); - returnFuture.set(newSuccessfulResult()); + returnFuture.set(newSuccessfulResult(new RemoveShardReplicaOutputBuilder().build())); } @Override - public void onFailure(Throwable failure) { + public void onFailure(final Throwable failure) { onMessageFailure(String.format("Failed to remove replica for shard %s", shardName), returnFuture, failure); } - }); + }, MoreExecutors.directExecutor()); return returnFuture; } @Override - public Future> makeLeaderLocal(final MakeLeaderLocalInput input) { + public ListenableFuture> locateShard(final LocateShardInput input) { + final ActorUtils utils; + switch (input.getDataStoreType()) { + case Config: + utils = configDataStore.getActorUtils(); + break; + case Operational: + utils = operDataStore.getActorUtils(); + break; + default: + return newFailedRpcResultFuture("Unhandled datastore in " + input); + } + + final SettableFuture> ret = SettableFuture.create(); + utils.findPrimaryShardAsync(input.getShardName()).onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final PrimaryShardInfo success) throws Throwable { + if (failure != null) { + LOG.debug("Failed to find shard for {}", input, failure); + ret.setException(failure); + return; + } + + // Data tree implies local leak + if (success.getLocalShardDataTree().isPresent()) { + ret.set(LOCAL_SHARD_RESULT); + return; + } + + final ActorSelection actorPath = success.getPrimaryShardActor(); + ret.set(newSuccessfulResult(new LocateShardOutputBuilder() + .setMemberNode(new LeaderActorRefBuilder() + .setLeaderActorRef(actorPath.toSerializationFormat()) + .build()) + .build())); + } + }, utils.getClientDispatcher()); + + return ret; + } + + @Override + public ListenableFuture> makeLeaderLocal(final MakeLeaderLocalInput input) { final String shardName = input.getShardName(); if (Strings.isNullOrEmpty(shardName)) { return newFailedRpcResultFuture("A valid shard name must be specified"); @@ -195,137 +279,58 @@ public class ClusterAdminRpcService implements ClusterAdminService { return newFailedRpcResultFuture("A valid DataStoreType must be specified"); } - ActorContext actorContext = dataStoreType == DataStoreType.Config - ? configDataStore.getActorContext() - : operDataStore.getActorContext(); + ActorUtils actorUtils = dataStoreType == DataStoreType.Config + ? configDataStore.getActorUtils() : operDataStore.getActorUtils(); LOG.info("Moving leader to local node {} for shard {}, datastoreType {}", - actorContext.getCurrentMemberName().getName(), shardName, dataStoreType); + actorUtils.getCurrentMemberName().getName(), shardName, dataStoreType); - final scala.concurrent.Future localShardReply = - actorContext.findLocalShardAsync(shardName); + final Future localShardReply = actorUtils.findLocalShardAsync(shardName); final scala.concurrent.Promise makeLeaderLocalAsk = akka.dispatch.Futures.promise(); localShardReply.onComplete(new OnComplete() { @Override - public void onComplete(final Throwable failure, final ActorRef actorRef) throws Throwable { + public void onComplete(final Throwable failure, final ActorRef actorRef) { if (failure != null) { LOG.warn("No local shard found for {} datastoreType {} - Cannot request leadership transfer to" - + " local shard.", shardName, failure); + + " local shard.", shardName, dataStoreType, failure); makeLeaderLocalAsk.failure(failure); } else { makeLeaderLocalAsk - .completeWith(actorContext + .completeWith(actorUtils .executeOperationAsync(actorRef, MakeLeaderLocal.INSTANCE, makeLeaderLocalTimeout)); } } - }, actorContext.getClientDispatcher()); + }, actorUtils.getClientDispatcher()); - final SettableFuture> future = SettableFuture.create(); - makeLeaderLocalAsk.future().onComplete(new OnComplete() { + final SettableFuture> future = SettableFuture.create(); + makeLeaderLocalAsk.future().onComplete(new OnComplete<>() { @Override - public void onComplete(final Throwable failure, final Object success) throws Throwable { + public void onComplete(final Throwable failure, final Object success) { if (failure != null) { LOG.error("Leadership transfer failed for shard {}.", shardName, failure); - future.set(RpcResultBuilder.failed().withError(ErrorType.APPLICATION, + future.set(RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "leadership transfer failed", failure).build()); return; } LOG.debug("Leadership transfer complete"); - future.set(RpcResultBuilder.success().build()); + future.set(RpcResultBuilder.success(new MakeLeaderLocalOutputBuilder().build()).build()); } - }, actorContext.getClientDispatcher()); + }, actorUtils.getClientDispatcher()); return future; } @Override - public Future> addPrefixShardReplica(final AddPrefixShardReplicaInput input) { - - final InstanceIdentifier identifier = input.getShardPrefix(); - if (identifier == null) { - return newFailedRpcResultFuture("A valid shard identifier must be specified"); - } - - final DataStoreType dataStoreType = input.getDataStoreType(); - if (dataStoreType == null) { - return newFailedRpcResultFuture("A valid DataStoreType must be specified"); - } - - LOG.info("Adding replica for shard {}, datastore type {}", identifier, dataStoreType); - - final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier); - final SettableFuture> returnFuture = SettableFuture.create(); - ListenableFuture future = sendMessageToShardManager(dataStoreType, new AddPrefixShardReplica(prefix)); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Success success) { - LOG.info("Successfully added replica for shard {}", prefix); - returnFuture.set(newSuccessfulResult()); - } - - @Override - public void onFailure(Throwable failure) { - onMessageFailure(String.format("Failed to add replica for shard %s", prefix), - returnFuture, failure); - } - }); - - return returnFuture; - } - - @Override - public Future> removePrefixShardReplica(final RemovePrefixShardReplicaInput input) { - - final InstanceIdentifier identifier = input.getShardPrefix(); - if (identifier == null) { - return newFailedRpcResultFuture("A valid shard identifier must be specified"); - } - - final DataStoreType dataStoreType = input.getDataStoreType(); - if (dataStoreType == null) { - return newFailedRpcResultFuture("A valid DataStoreType must be specified"); - } - - final String memberName = input.getMemberName(); - if (Strings.isNullOrEmpty(memberName)) { - return newFailedRpcResultFuture("A valid member name must be specified"); - } - - LOG.info("Removing replica for shard {} memberName {}, datastoreType {}", - identifier, memberName, dataStoreType); - final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier); - - final SettableFuture> returnFuture = SettableFuture.create(); - final ListenableFuture future = sendMessageToShardManager(dataStoreType, - new RemovePrefixShardReplica(prefix, MemberName.forName(memberName))); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final Success success) { - LOG.info("Successfully removed replica for shard {}", prefix); - returnFuture.set(newSuccessfulResult()); - } - - @Override - public void onFailure(final Throwable failure) { - onMessageFailure(String.format("Failed to remove replica for shard %s", prefix), - returnFuture, failure); - } - }); - - return returnFuture; - } - - @Override - public Future> addReplicasForAllShards() { + public ListenableFuture> addReplicasForAllShards( + final AddReplicasForAllShardsInput input) { LOG.info("Adding replicas for all shards"); final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); - Function messageSupplier = AddShardReplica::new; - sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier); - sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier); + sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, AddShardReplica::new); + sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, AddShardReplica::new); return waitForShardResults(shardResultData, shardResults -> new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build(), @@ -334,7 +339,8 @@ public class ClusterAdminRpcService implements ClusterAdminService { @Override - public Future> removeAllShardReplicas(RemoveAllShardReplicasInput input) { + public ListenableFuture> removeAllShardReplicas( + final RemoveAllShardReplicasInput input) { LOG.info("Removing replicas for all shards"); final String memberName = input.getMemberName(); @@ -355,7 +361,8 @@ public class ClusterAdminRpcService implements ClusterAdminService { } @Override - public Future> changeMemberVotingStatesForShard(ChangeMemberVotingStatesForShardInput input) { + public ListenableFuture> changeMemberVotingStatesForShard( + final ChangeMemberVotingStatesForShardInput input) { final String shardName = input.getShardName(); if (Strings.isNullOrEmpty(shardName)) { return newFailedRpcResultFuture("A valid shard name must be specified"); @@ -377,27 +384,27 @@ public class ClusterAdminRpcService implements ClusterAdminService { LOG.info("Change member voting states for shard {}: {}", shardName, changeVotingStatus.getMeberVotingStatusMap()); - final SettableFuture> returnFuture = SettableFuture.create(); + final SettableFuture> returnFuture = SettableFuture.create(); ListenableFuture future = sendMessageToShardManager(dataStoreType, changeVotingStatus); Futures.addCallback(future, new FutureCallback() { @Override - public void onSuccess(Success success) { + public void onSuccess(final Success success) { LOG.info("Successfully changed member voting states for shard {}", shardName); - returnFuture.set(newSuccessfulResult()); + returnFuture.set(newSuccessfulResult(new ChangeMemberVotingStatesForShardOutputBuilder().build())); } @Override - public void onFailure(Throwable failure) { + public void onFailure(final Throwable failure) { onMessageFailure(String.format("Failed to change member voting states for shard %s", shardName), returnFuture, failure); } - }); + }, MoreExecutors.directExecutor()); return returnFuture; } @Override - public Future> changeMemberVotingStatesForAllShards( + public ListenableFuture> changeMemberVotingStatesForAllShards( final ChangeMemberVotingStatesForAllShardsInput input) { List memberVotingStates = input.getMemberVotingState(); if (memberVotingStates == null || memberVotingStates.isEmpty()) { @@ -419,7 +426,8 @@ public class ClusterAdminRpcService implements ClusterAdminService { } @Override - public Future> flipMemberVotingStatesForAllShards() { + public ListenableFuture> flipMemberVotingStatesForAllShards( + final FlipMemberVotingStatesForAllShardsInput input) { final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); Function messageSupplier = FlipShardMembersVotingStatus::new; @@ -434,7 +442,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { } @Override - public Future> getShardRole(final GetShardRoleInput input) { + public ListenableFuture> getShardRole(final GetShardRoleInput input) { final String shardName = input.getShardName(); if (Strings.isNullOrEmpty(shardName)) { return newFailedRpcResultFuture("A valid shard name must be specified"); @@ -471,121 +479,176 @@ public class ClusterAdminRpcService implements ClusterAdminService { returnFuture.set(ClusterAdminRpcService.newFailedRpcResultBuilder( "Failed to get shard role.", failure).build()); } - }); + }, MoreExecutors.directExecutor()); return returnFuture; } @Override - public Future> getPrefixShardRole(final GetPrefixShardRoleInput input) { - final InstanceIdentifier identifier = input.getShardPrefix(); - if (identifier == null) { - return newFailedRpcResultFuture("A valid shard identifier must be specified"); - } + public ListenableFuture> backupDatastore(final BackupDatastoreInput input) { + LOG.debug("backupDatastore: {}", input); - final DataStoreType dataStoreType = input.getDataStoreType(); - if (dataStoreType == null) { - return newFailedRpcResultFuture("A valid DataStoreType must be specified"); + if (Strings.isNullOrEmpty(input.getFilePath())) { + return newFailedRpcResultFuture("A valid file path must be specified"); } - LOG.info("Getting prefix shard role for shard: {}, datastore type {}", identifier, dataStoreType); + final Uint32 timeout = input.getTimeout(); + final Timeout opTimeout = timeout != null ? Timeout.apply(timeout.longValue(), TimeUnit.SECONDS) + : SHARD_MGR_TIMEOUT; - final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier); - final String shardName = ClusterUtils.getCleanShardName(prefix); - final SettableFuture> returnFuture = SettableFuture.create(); - ListenableFuture future = sendMessageToShardManager(dataStoreType, - new GetShardRole(shardName)); - Futures.addCallback(future, new FutureCallback() { + final SettableFuture> returnFuture = SettableFuture.create(); + ListenableFuture> future = sendMessageToShardManagers(new GetSnapshot(opTimeout)); + Futures.addCallback(future, new FutureCallback<>() { @Override - public void onSuccess(final GetShardRoleReply reply) { - if (reply == null) { - returnFuture.set(ClusterAdminRpcService.newFailedRpcResultBuilder( - "No Shard role present. Please retry..").build()); - return; - } - - LOG.info("Successfully received role:{} for shard {}", reply.getRole(), shardName); - final GetPrefixShardRoleOutputBuilder builder = new GetPrefixShardRoleOutputBuilder(); - if (reply.getRole() != null) { - builder.setRole(reply.getRole()); - } - returnFuture.set(newSuccessfulResult(builder.build())); + public void onSuccess(final List snapshots) { + saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture); } @Override public void onFailure(final Throwable failure) { - returnFuture.set(ClusterAdminRpcService.newFailedRpcResultBuilder( - "Failed to get shard role.", failure).build()); + onDatastoreBackupFailure(input.getFilePath(), returnFuture, failure); } - }); + }, MoreExecutors.directExecutor()); return returnFuture; } + @Override - public Future> backupDatastore(final BackupDatastoreInput input) { - LOG.debug("backupDatastore: {}", input); + public ListenableFuture> getKnownClientsForAllShards( + final GetKnownClientsForAllShardsInput input) { + final ImmutableMap> allShardReplies = + getAllShardLeadersClients(); + return Futures.whenAllComplete(allShardReplies.values()).call(() -> processReplies(allShardReplies), + MoreExecutors.directExecutor()); + } - if (Strings.isNullOrEmpty(input.getFilePath())) { - return newFailedRpcResultFuture("A valid file path must be specified"); - } + @Override + public ListenableFuture> activateEosDatacenter( + final ActivateEosDatacenterInput input) { + LOG.debug("Activating EOS Datacenter"); + final SettableFuture> future = SettableFuture.create(); + Futures.addCallback(dataCenterControl.activateDataCenter(), new FutureCallback<>() { + @Override + public void onSuccess(final Empty result) { + LOG.debug("Successfully activated datacenter."); + future.set(RpcResultBuilder.success().build()); + } - final SettableFuture> returnFuture = SettableFuture.create(); - ListenableFuture> future = sendMessageToShardManagers(GetSnapshot.INSTANCE); - Futures.addCallback(future, new FutureCallback>() { @Override - public void onSuccess(List snapshots) { - saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture); + public void onFailure(final Throwable failure) { + future.set(ClusterAdminRpcService.newFailedRpcResultBuilder( + "Failed to activate datacenter.", failure).build()); } + }, MoreExecutors.directExecutor()); + return future; + } + + @Override + public ListenableFuture> deactivateEosDatacenter( + final DeactivateEosDatacenterInput input) { + LOG.debug("Deactivating EOS Datacenter"); + final SettableFuture> future = SettableFuture.create(); + Futures.addCallback(dataCenterControl.deactivateDataCenter(), new FutureCallback<>() { @Override - public void onFailure(Throwable failure) { - onDatastoreBackupFailure(input.getFilePath(), returnFuture, failure); + public void onSuccess(final Empty result) { + LOG.debug("Successfully deactivated datacenter."); + future.set(RpcResultBuilder.success().build()); } - }); - return returnFuture; + @Override + public void onFailure(final Throwable failure) { + future.set(ClusterAdminRpcService.newFailedRpcResultBuilder( + "Failed to deactivate datacenter.", failure).build()); + } + }, MoreExecutors.directExecutor()); + + return future; } - private ChangeShardMembersVotingStatus toChangeShardMembersVotingStatus(final String shardName, - List memberVotingStatus) { + private static RpcResult processReplies( + final ImmutableMap> allShardReplies) { + final Map result = Maps.newHashMapWithExpectedSize(allShardReplies.size()); + for (Entry> entry : allShardReplies.entrySet()) { + final ListenableFuture future = entry.getValue(); + final ShardResultBuilder builder = new ShardResultBuilder() + .setDataStoreType(entry.getKey().getDataStoreType()) + .setShardName(entry.getKey().getShardName()); + + final GetKnownClientsReply reply; + try { + reply = Futures.getDone(future); + } catch (ExecutionException e) { + LOG.debug("Shard {} failed to answer", entry.getKey(), e); + final ShardResult sr = builder + .setSucceeded(Boolean.FALSE) + .setErrorMessage(e.getCause().getMessage()) + .build(); + result.put(sr.key(), sr); + continue; + } + + final ShardResult sr = builder + .setSucceeded(Boolean.TRUE) + .addAugmentation(new ShardResult1Builder() + .setKnownClients(reply.getClients().stream() + .map(client -> new KnownClientsBuilder() + .setMember(client.getFrontendId().getMemberName().toYang()) + .setType(client.getFrontendId().getClientType().toYang()) + .setGeneration(client.getYangGeneration()) + .build()) + .collect(Collectors.toMap(KnownClients::key, Function.identity()))) + .build()) + .build(); + + result.put(sr.key(), sr); + } + + return RpcResultBuilder.success(new GetKnownClientsForAllShardsOutputBuilder().setShardResult(result).build()) + .build(); + } + + private static ChangeShardMembersVotingStatus toChangeShardMembersVotingStatus(final String shardName, + final List memberVotingStatus) { Map serverVotingStatusMap = new HashMap<>(); for (MemberVotingState memberStatus: memberVotingStatus) { - serverVotingStatusMap.put(memberStatus.getMemberName(), memberStatus.isVoting()); + serverVotingStatusMap.put(memberStatus.getMemberName(), memberStatus.getVoting()); } - - ChangeShardMembersVotingStatus changeVotingStatus = new ChangeShardMembersVotingStatus(shardName, - serverVotingStatusMap); - return changeVotingStatus; + return new ChangeShardMembersVotingStatus(shardName, serverVotingStatusMap); } private static SettableFuture> waitForShardResults( final List, ShardResultBuilder>> shardResultData, - final Function, T> resultDataSupplier, + final Function, T> resultDataSupplier, final String failureLogMsgPrefix) { final SettableFuture> returnFuture = SettableFuture.create(); - final List shardResults = new ArrayList<>(); + final Map shardResults = new HashMap<>(); for (final Entry, ShardResultBuilder> entry : shardResultData) { Futures.addCallback(entry.getKey(), new FutureCallback() { @Override - public void onSuccess(Success result) { + public void onSuccess(final Success result) { synchronized (shardResults) { - ShardResultBuilder shardResult = entry.getValue(); - LOG.debug("onSuccess for shard {}, type {}", shardResult.getShardName(), - shardResult.getDataStoreType()); - shardResults.add(shardResult.setSucceeded(true).build()); + final ShardResultBuilder builder = entry.getValue(); + LOG.debug("onSuccess for shard {}, type {}", builder.getShardName(), + builder.getDataStoreType()); + final ShardResult sr = builder.setSucceeded(Boolean.TRUE).build(); + shardResults.put(sr.key(), sr); checkIfComplete(); } } @Override - public void onFailure(Throwable failure) { + public void onFailure(final Throwable failure) { synchronized (shardResults) { - ShardResultBuilder shardResult = entry.getValue(); - LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, shardResult.getShardName(), - shardResult.getDataStoreType(), failure); - shardResults.add(shardResult.setSucceeded(false).setErrorMessage( - Throwables.getRootCause(failure).getMessage()).build()); + ShardResultBuilder builder = entry.getValue(); + LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, builder.getShardName(), + builder.getDataStoreType(), failure); + final ShardResult sr = builder + .setSucceeded(Boolean.FALSE) + .setErrorMessage(Throwables.getRootCause(failure).getMessage()) + .build(); + shardResults.put(sr.key(), sr); checkIfComplete(); } } @@ -596,105 +659,135 @@ public class ClusterAdminRpcService implements ClusterAdminService { returnFuture.set(newSuccessfulResult(resultDataSupplier.apply(shardResults))); } } - }); + }, MoreExecutors.directExecutor()); } return returnFuture; } - private void sendMessageToManagerForConfiguredShards(DataStoreType dataStoreType, - List, ShardResultBuilder>> shardResultData, - Function messageSupplier) { - ActorContext actorContext = dataStoreType == DataStoreType.Config ? configDataStore.getActorContext() - : operDataStore.getActorContext(); - Set allShardNames = actorContext.getConfiguration().getAllShardNames(); + private void sendMessageToManagerForConfiguredShards(final DataStoreType dataStoreType, + final List, ShardResultBuilder>> shardResultData, + final Function messageSupplier) { + ActorUtils actorUtils = dataStoreType == DataStoreType.Config ? configDataStore.getActorUtils() + : operDataStore.getActorUtils(); + Set allShardNames = actorUtils.getConfiguration().getAllShardNames(); - LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorContext.getDataStoreName()); + LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorUtils.getDataStoreName()); for (String shardName: allShardNames) { - ListenableFuture future = this.ask(actorContext.getShardManager(), messageSupplier.apply(shardName), - SHARD_MGR_TIMEOUT); + ListenableFuture future = this.ask(actorUtils.getShardManager(), messageSupplier.apply(shardName), + SHARD_MGR_TIMEOUT); shardResultData.add(new SimpleEntry<>(future, new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType))); } } - @SuppressWarnings("unchecked") - private ListenableFuture> sendMessageToShardManagers(Object message) { + private ListenableFuture> sendMessageToShardManagers(final Object message) { Timeout timeout = SHARD_MGR_TIMEOUT; - ListenableFuture configFuture = ask(configDataStore.getActorContext().getShardManager(), message, timeout); - ListenableFuture operFuture = ask(operDataStore.getActorContext().getShardManager(), message, timeout); + ListenableFuture configFuture = ask(configDataStore.getActorUtils().getShardManager(), message, timeout); + ListenableFuture operFuture = ask(operDataStore.getActorUtils().getShardManager(), message, timeout); return Futures.allAsList(configFuture, operFuture); } - private ListenableFuture sendMessageToShardManager(DataStoreType dataStoreType, Object message) { + private ListenableFuture sendMessageToShardManager(final DataStoreType dataStoreType, final Object message) { ActorRef shardManager = dataStoreType == DataStoreType.Config - ? configDataStore.getActorContext().getShardManager() - : operDataStore.getActorContext().getShardManager(); + ? configDataStore.getActorUtils().getShardManager() + : operDataStore.getActorUtils().getShardManager(); return ask(shardManager, message, SHARD_MGR_TIMEOUT); } + @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", + justification = "https://github.com/spotbugs/spotbugs/issues/811") @SuppressWarnings("checkstyle:IllegalCatch") - private static void saveSnapshotsToFile(DatastoreSnapshotList snapshots, String fileName, - SettableFuture> returnFuture) { + private static void saveSnapshotsToFile(final DatastoreSnapshotList snapshots, final String fileName, + final SettableFuture> returnFuture) { try (FileOutputStream fos = new FileOutputStream(fileName)) { SerializationUtils.serialize(snapshots, fos); - returnFuture.set(newSuccessfulResult()); + returnFuture.set(newSuccessfulResult(new BackupDatastoreOutputBuilder().build())); LOG.info("Successfully backed up datastore to file {}", fileName); } catch (IOException | RuntimeException e) { onDatastoreBackupFailure(fileName, returnFuture, e); } } - private static void onDatastoreBackupFailure(String fileName, SettableFuture> returnFuture, - Throwable failure) { + private static void onDatastoreBackupFailure(final String fileName, + final SettableFuture> returnFuture, final Throwable failure) { onMessageFailure(String.format("Failed to back up datastore to file %s", fileName), returnFuture, failure); } - private static void onMessageFailure(String msg, final SettableFuture> returnFuture, - Throwable failure) { - LOG.error(msg, failure); - returnFuture.set(ClusterAdminRpcService.newFailedRpcResultBuilder(String.format("%s: %s", msg, + @SuppressFBWarnings("SLF4J_SIGN_ONLY_FORMAT") + private static void onMessageFailure(final String msg, final SettableFuture> returnFuture, + final Throwable failure) { + LOG.error("{}", msg, failure); + returnFuture.set(ClusterAdminRpcService.newFailedRpcResultBuilder(String.format("%s: %s", msg, failure.getMessage())).build()); } - private ListenableFuture ask(ActorRef actor, Object message, Timeout timeout) { + private ListenableFuture ask(final ActorRef actor, final Object message, final Timeout timeout) { final SettableFuture returnFuture = SettableFuture.create(); @SuppressWarnings("unchecked") - scala.concurrent.Future askFuture = (scala.concurrent.Future) Patterns.ask(actor, message, timeout); + Future askFuture = (Future) Patterns.ask(actor, message, timeout); askFuture.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, T resp) { + public void onComplete(final Throwable failure, final T resp) { if (failure != null) { returnFuture.setException(failure); } else { returnFuture.set(resp); } } - }, configDataStore.getActorContext().getClientDispatcher()); + }, configDataStore.getActorUtils().getClientDispatcher()); return returnFuture; } - private static ListenableFuture> newFailedRpcResultFuture(String message) { + private ImmutableMap> getAllShardLeadersClients() { + final ImmutableMap.Builder> builder = + ImmutableMap.builder(); + + addAllShardsClients(builder, DataStoreType.Config, configDataStore.getActorUtils()); + addAllShardsClients(builder, DataStoreType.Operational, operDataStore.getActorUtils()); + + return builder.build(); + } + + private static void addAllShardsClients( + final ImmutableMap.Builder> builder, + final DataStoreType type, final ActorUtils utils) { + for (String shardName : utils.getConfiguration().getAllShardNames()) { + final SettableFuture future = SettableFuture.create(); + builder.put(new ShardIdentifier(type, shardName), future); + + utils.findPrimaryShardAsync(shardName).flatMap( + info -> Patterns.ask(info.getPrimaryShardActor(), GetKnownClients.INSTANCE, SHARD_MGR_TIMEOUT), + utils.getClientDispatcher()).onComplete(new OnComplete<>() { + @Override + public void onComplete(final Throwable failure, final Object success) { + if (failure == null) { + future.set((GetKnownClientsReply) success); + } else { + future.setException(failure); + } + } + }, utils.getClientDispatcher()); + } + } + + private static ListenableFuture> newFailedRpcResultFuture(final String message) { return ClusterAdminRpcService.newFailedRpcResultBuilder(message).buildFuture(); } - private static RpcResultBuilder newFailedRpcResultBuilder(String message) { + private static RpcResultBuilder newFailedRpcResultBuilder(final String message) { return newFailedRpcResultBuilder(message, null); } - private static RpcResultBuilder newFailedRpcResultBuilder(String message, Throwable cause) { + private static RpcResultBuilder newFailedRpcResultBuilder(final String message, final Throwable cause) { return RpcResultBuilder.failed().withError(ErrorType.RPC, message, cause); } - private static RpcResult newSuccessfulResult() { - return newSuccessfulResult((Void)null); - } - - private static RpcResult newSuccessfulResult(T data) { - return RpcResultBuilder.success(data).build(); + private static RpcResult newSuccessfulResult(final T data) { + return RpcResultBuilder.success(data).build(); } }