X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-cluster-admin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fadmin%2FClusterAdminRpcService.java;h=8ad1553dba2268daa9c7a86fcad319997d8fa22e;hb=HEAD;hp=3e14a376453cb62b4f0e81154ee4267e162e093e;hpb=e32959e0bbc326f47c30ed7347f9a9af26813f89;p=controller.git diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java index 3e14a37645..8ad1553dba 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -13,9 +13,11 @@ import akka.actor.Status.Success; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -39,7 +41,6 @@ import org.apache.commons.lang3.SerializationUtils; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; -import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus; @@ -49,75 +50,79 @@ import org.opendaylight.controller.cluster.datastore.messages.GetShardRole; import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply; import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; -import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; -import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; -import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaOutputBuilder; +import org.opendaylight.controller.eos.akka.DataCenterControl; +import org.opendaylight.mdsal.binding.api.RpcProviderService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ActivateEosDatacenter; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ActivateEosDatacenterInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ActivateEosDatacenterOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShards; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastore; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShards; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShard; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DeactivateEosDatacenter; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DeactivateEosDatacenterInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DeactivateEosDatacenterOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShards; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShards; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsOutputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShard; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicas; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.ShardResult1; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.ShardResult1Builder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.shard.result.KnownClients; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.shard.result.KnownClientsBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LeaderActorRefBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LocalBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultKey; +import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.common.Empty; -import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; +import org.opendaylight.yangtools.yang.common.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.common.Uint32; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -127,35 +132,58 @@ import scala.concurrent.Future; * * @author Thomas Pantelis */ -public class ClusterAdminRpcService implements ClusterAdminService { +public final class ClusterAdminRpcService { private static final Timeout SHARD_MGR_TIMEOUT = new Timeout(1, TimeUnit.MINUTES); private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class); private static final @NonNull RpcResult LOCAL_SHARD_RESULT = RpcResultBuilder.success(new LocateShardOutputBuilder() - .setMemberNode(new LocalBuilder().setLocal(Empty.getInstance()).build()) + .setMemberNode(new LocalBuilder().setLocal(Empty.value()).build()) .build()) .build(); private final DistributedDataStoreInterface configDataStore; private final DistributedDataStoreInterface operDataStore; - private final BindingNormalizedNodeSerializer serializer; private final Timeout makeLeaderLocalTimeout; + private final DataCenterControl dataCenterControl; public ClusterAdminRpcService(final DistributedDataStoreInterface configDataStore, - final DistributedDataStoreInterface operDataStore, - final BindingNormalizedNodeSerializer serializer) { + final DistributedDataStoreInterface operDataStore, + final DataCenterControl dataCenterControl) { this.configDataStore = configDataStore; this.operDataStore = operDataStore; - this.serializer = serializer; - this.makeLeaderLocalTimeout = + makeLeaderLocalTimeout = new Timeout(configDataStore.getActorUtils().getDatastoreContext() .getShardLeaderElectionTimeout().duration().$times(2)); - } - @Override - public ListenableFuture> addShardReplica(final AddShardReplicaInput input) { + this.dataCenterControl = dataCenterControl; + } + + Registration registerWith(final RpcProviderService rpcProviderService) { + return rpcProviderService.registerRpcImplementations( + (org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013 + .AddShardReplica) this::addShardReplica, + (org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013 + .RemoveShardReplica) this::removeShardReplica, + (LocateShard) this::locateShard, + (org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013 + .MakeLeaderLocal) this::makeLeaderLocal, + (AddReplicasForAllShards) this::addReplicasForAllShards, + (RemoveAllShardReplicas) this::removeAllShardReplicas, + (ChangeMemberVotingStatesForShard) this::changeMemberVotingStatesForShard, + (ChangeMemberVotingStatesForAllShards) this::changeMemberVotingStatesForAllShards, + (FlipMemberVotingStatesForAllShards) this::flipMemberVotingStatesForAllShards, + (org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013 + .GetShardRole) this::getShardRole, + (BackupDatastore) this::backupDatastore, + (GetKnownClientsForAllShards) this::getKnownClientsForAllShards, + (ActivateEosDatacenter) this::activateEosDatacenter, + (DeactivateEosDatacenter) this::deactivateEosDatacenter); + } + + @VisibleForTesting + ListenableFuture> addShardReplica(final AddShardReplicaInput input) { final String shardName = input.getShardName(); if (Strings.isNullOrEmpty(shardName)) { return newFailedRpcResultFuture("A valid shard name must be specified"); @@ -168,28 +196,27 @@ public class ClusterAdminRpcService implements ClusterAdminService { LOG.info("Adding replica for shard {}", shardName); - final SettableFuture> returnFuture = SettableFuture.create(); - ListenableFuture future = sendMessageToShardManager(dataStoreType, new AddShardReplica(shardName)); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final Success success) { - LOG.info("Successfully added replica for shard {}", shardName); - returnFuture.set(newSuccessfulResult(new AddShardReplicaOutputBuilder().build())); - } + final var returnFuture = SettableFuture.>create(); + Futures.addCallback(sendMessageToShardManager(dataStoreType, new AddShardReplica(shardName)), + new FutureCallback() { + @Override + public void onSuccess(final Success success) { + LOG.info("Successfully added replica for shard {}", shardName); + returnFuture.set(newSuccessfulResult(new AddShardReplicaOutputBuilder().build())); + } - @Override - public void onFailure(final Throwable failure) { - onMessageFailure(String.format("Failed to add replica for shard %s", shardName), + @Override + public void onFailure(final Throwable failure) { + onMessageFailure(String.format("Failed to add replica for shard %s", shardName), returnFuture, failure); - } - }, MoreExecutors.directExecutor()); + } + }, MoreExecutors.directExecutor()); return returnFuture; } - @Override - public ListenableFuture> removeShardReplica( - final RemoveShardReplicaInput input) { + @VisibleForTesting + ListenableFuture> removeShardReplica(final RemoveShardReplicaInput input) { final String shardName = input.getShardName(); if (Strings.isNullOrEmpty(shardName)) { return newFailedRpcResultFuture("A valid shard name must be specified"); @@ -227,8 +254,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { return returnFuture; } - @Override - public ListenableFuture> locateShard(final LocateShardInput input) { + private ListenableFuture> locateShard(final LocateShardInput input) { final ActorUtils utils; switch (input.getDataStoreType()) { case Config: @@ -269,8 +295,8 @@ public class ClusterAdminRpcService implements ClusterAdminService { return ret; } - @Override - public ListenableFuture> makeLeaderLocal(final MakeLeaderLocalInput input) { + @VisibleForTesting + ListenableFuture> makeLeaderLocal(final MakeLeaderLocalInput input) { final String shardName = input.getShardName(); if (Strings.isNullOrEmpty(shardName)) { return newFailedRpcResultFuture("A valid shard name must be specified"); @@ -324,87 +350,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { return future; } - @Override - public ListenableFuture> addPrefixShardReplica( - final AddPrefixShardReplicaInput input) { - - final InstanceIdentifier identifier = input.getShardPrefix(); - if (identifier == null) { - return newFailedRpcResultFuture("A valid shard identifier must be specified"); - } - - final DataStoreType dataStoreType = input.getDataStoreType(); - if (dataStoreType == null) { - return newFailedRpcResultFuture("A valid DataStoreType must be specified"); - } - - LOG.info("Adding replica for shard {}, datastore type {}", identifier, dataStoreType); - - final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier); - final SettableFuture> returnFuture = SettableFuture.create(); - ListenableFuture future = sendMessageToShardManager(dataStoreType, new AddPrefixShardReplica(prefix)); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final Success success) { - LOG.info("Successfully added replica for shard {}", prefix); - returnFuture.set(newSuccessfulResult(new AddPrefixShardReplicaOutputBuilder().build())); - } - - @Override - public void onFailure(final Throwable failure) { - onMessageFailure(String.format("Failed to add replica for shard %s", prefix), - returnFuture, failure); - } - }, MoreExecutors.directExecutor()); - - return returnFuture; - } - - @Override - public ListenableFuture> removePrefixShardReplica( - final RemovePrefixShardReplicaInput input) { - - final InstanceIdentifier identifier = input.getShardPrefix(); - if (identifier == null) { - return newFailedRpcResultFuture("A valid shard identifier must be specified"); - } - - final DataStoreType dataStoreType = input.getDataStoreType(); - if (dataStoreType == null) { - return newFailedRpcResultFuture("A valid DataStoreType must be specified"); - } - - final String memberName = input.getMemberName(); - if (Strings.isNullOrEmpty(memberName)) { - return newFailedRpcResultFuture("A valid member name must be specified"); - } - - LOG.info("Removing replica for shard {} memberName {}, datastoreType {}", - identifier, memberName, dataStoreType); - final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier); - - final SettableFuture> returnFuture = SettableFuture.create(); - final ListenableFuture future = sendMessageToShardManager(dataStoreType, - new RemovePrefixShardReplica(prefix, MemberName.forName(memberName))); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final Success success) { - LOG.info("Successfully removed replica for shard {}", prefix); - returnFuture.set(newSuccessfulResult(new RemovePrefixShardReplicaOutputBuilder().build())); - } - - @Override - public void onFailure(final Throwable failure) { - onMessageFailure(String.format("Failed to remove replica for shard %s", prefix), - returnFuture, failure); - } - }, MoreExecutors.directExecutor()); - - return returnFuture; - } - - @Override - public ListenableFuture> addReplicasForAllShards( + @VisibleForTesting ListenableFuture> addReplicasForAllShards( final AddReplicasForAllShardsInput input) { LOG.info("Adding replicas for all shards"); @@ -418,9 +364,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { "Failed to add replica"); } - - @Override - public ListenableFuture> removeAllShardReplicas( + @VisibleForTesting ListenableFuture> removeAllShardReplicas( final RemoveAllShardReplicasInput input) { LOG.info("Removing replicas for all shards"); @@ -436,56 +380,54 @@ public class ClusterAdminRpcService implements ClusterAdminService { sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier); sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier); - return waitForShardResults(shardResultData, shardResults -> - new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build(), - " Failed to remove replica"); + return waitForShardResults(shardResultData, + shardResults -> new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build(), + " Failed to remove replica"); } - @Override - public ListenableFuture> changeMemberVotingStatesForShard( + @VisibleForTesting + ListenableFuture> changeMemberVotingStatesForShard( final ChangeMemberVotingStatesForShardInput input) { final String shardName = input.getShardName(); if (Strings.isNullOrEmpty(shardName)) { return newFailedRpcResultFuture("A valid shard name must be specified"); } - DataStoreType dataStoreType = input.getDataStoreType(); + final var dataStoreType = input.getDataStoreType(); if (dataStoreType == null) { return newFailedRpcResultFuture("A valid DataStoreType must be specified"); } - List memberVotingStates = input.getMemberVotingState(); + final var memberVotingStates = input.getMemberVotingState(); if (memberVotingStates == null || memberVotingStates.isEmpty()) { return newFailedRpcResultFuture("No member voting state input was specified"); } - ChangeShardMembersVotingStatus changeVotingStatus = toChangeShardMembersVotingStatus(shardName, - memberVotingStates); - + final var changeVotingStatus = toChangeShardMembersVotingStatus(shardName, memberVotingStates); LOG.info("Change member voting states for shard {}: {}", shardName, changeVotingStatus.getMeberVotingStatusMap()); - final SettableFuture> returnFuture = SettableFuture.create(); - ListenableFuture future = sendMessageToShardManager(dataStoreType, changeVotingStatus); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final Success success) { - LOG.info("Successfully changed member voting states for shard {}", shardName); - returnFuture.set(newSuccessfulResult(new ChangeMemberVotingStatesForShardOutputBuilder().build())); - } + final var returnFuture = SettableFuture.>create(); + Futures.addCallback(sendMessageToShardManager(dataStoreType, changeVotingStatus), + new FutureCallback() { + @Override + public void onSuccess(final Success success) { + LOG.info("Successfully changed member voting states for shard {}", shardName); + returnFuture.set(newSuccessfulResult(new ChangeMemberVotingStatesForShardOutputBuilder().build())); + } - @Override - public void onFailure(final Throwable failure) { - onMessageFailure(String.format("Failed to change member voting states for shard %s", shardName), + @Override + public void onFailure(final Throwable failure) { + onMessageFailure(String.format("Failed to change member voting states for shard %s", shardName), returnFuture, failure); - } - }, MoreExecutors.directExecutor()); + } + }, MoreExecutors.directExecutor()); return returnFuture; } - @Override - public ListenableFuture> changeMemberVotingStatesForAllShards( + @VisibleForTesting + ListenableFuture> changeMemberVotingStatesForAllShards( final ChangeMemberVotingStatesForAllShardsInput input) { List memberVotingStates = input.getMemberVotingState(); if (memberVotingStates == null || memberVotingStates.isEmpty()) { @@ -506,11 +448,11 @@ public class ClusterAdminRpcService implements ClusterAdminService { "Failed to change member voting states"); } - @Override - public ListenableFuture> flipMemberVotingStatesForAllShards( + @VisibleForTesting + ListenableFuture> flipMemberVotingStatesForAllShards( final FlipMemberVotingStatesForAllShardsInput input) { - final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); - Function messageSupplier = FlipShardMembersVotingStatus::new; + final var shardResultData = new ArrayList, ShardResultBuilder>>(); + final Function messageSupplier = FlipShardMembersVotingStatus::new; LOG.info("Flip member voting states for all shards"); @@ -522,8 +464,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { "Failed to change member voting states"); } - @Override - public ListenableFuture> getShardRole(final GetShardRoleInput input) { + private ListenableFuture> getShardRole(final GetShardRoleInput input) { final String shardName = input.getShardName(); if (Strings.isNullOrEmpty(shardName)) { return newFailedRpcResultFuture("A valid shard name must be specified"); @@ -565,91 +506,88 @@ public class ClusterAdminRpcService implements ClusterAdminService { return returnFuture; } - @Override - public ListenableFuture> getPrefixShardRole( - final GetPrefixShardRoleInput input) { - final InstanceIdentifier identifier = input.getShardPrefix(); - if (identifier == null) { - return newFailedRpcResultFuture("A valid shard identifier must be specified"); - } + @VisibleForTesting + ListenableFuture> backupDatastore(final BackupDatastoreInput input) { + LOG.debug("backupDatastore: {}", input); - final DataStoreType dataStoreType = input.getDataStoreType(); - if (dataStoreType == null) { - return newFailedRpcResultFuture("A valid DataStoreType must be specified"); + if (Strings.isNullOrEmpty(input.getFilePath())) { + return newFailedRpcResultFuture("A valid file path must be specified"); } - LOG.info("Getting prefix shard role for shard: {}, datastore type {}", identifier, dataStoreType); + final Uint32 timeout = input.getTimeout(); + final Timeout opTimeout = timeout != null ? Timeout.apply(timeout.longValue(), TimeUnit.SECONDS) + : SHARD_MGR_TIMEOUT; - final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier); - final String shardName = ClusterUtils.getCleanShardName(prefix); - final SettableFuture> returnFuture = SettableFuture.create(); - ListenableFuture future = sendMessageToShardManager(dataStoreType, - new GetShardRole(shardName)); - Futures.addCallback(future, new FutureCallback() { + final SettableFuture> returnFuture = SettableFuture.create(); + ListenableFuture> future = sendMessageToShardManagers(new GetSnapshot(opTimeout)); + Futures.addCallback(future, new FutureCallback<>() { @Override - public void onSuccess(final GetShardRoleReply reply) { - if (reply == null) { - returnFuture.set(ClusterAdminRpcService.newFailedRpcResultBuilder( - "No Shard role present. Please retry..").build()); - return; - } - - LOG.info("Successfully received role:{} for shard {}", reply.getRole(), shardName); - final GetPrefixShardRoleOutputBuilder builder = new GetPrefixShardRoleOutputBuilder(); - if (reply.getRole() != null) { - builder.setRole(reply.getRole()); - } - returnFuture.set(newSuccessfulResult(builder.build())); + public void onSuccess(final List snapshots) { + saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture); } @Override public void onFailure(final Throwable failure) { - returnFuture.set(ClusterAdminRpcService.newFailedRpcResultBuilder( - "Failed to get shard role.", failure).build()); + onDatastoreBackupFailure(input.getFilePath(), returnFuture, failure); } }, MoreExecutors.directExecutor()); return returnFuture; } - @Override - public ListenableFuture> backupDatastore(final BackupDatastoreInput input) { - LOG.debug("backupDatastore: {}", input); - - if (Strings.isNullOrEmpty(input.getFilePath())) { - return newFailedRpcResultFuture("A valid file path must be specified"); - } + private ListenableFuture> getKnownClientsForAllShards( + final GetKnownClientsForAllShardsInput input) { + final ImmutableMap> allShardReplies = + getAllShardLeadersClients(); + return Futures.whenAllComplete(allShardReplies.values()).call(() -> processReplies(allShardReplies), + MoreExecutors.directExecutor()); + } - final SettableFuture> returnFuture = SettableFuture.create(); - ListenableFuture> future = sendMessageToShardManagers(GetSnapshot.INSTANCE); - Futures.addCallback(future, new FutureCallback>() { + private ListenableFuture> activateEosDatacenter( + final ActivateEosDatacenterInput input) { + LOG.debug("Activating EOS Datacenter"); + final SettableFuture> future = SettableFuture.create(); + Futures.addCallback(dataCenterControl.activateDataCenter(), new FutureCallback<>() { @Override - public void onSuccess(final List snapshots) { - saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture); + public void onSuccess(final Empty result) { + LOG.debug("Successfully activated datacenter."); + future.set(RpcResultBuilder.success().build()); } @Override public void onFailure(final Throwable failure) { - onDatastoreBackupFailure(input.getFilePath(), returnFuture, failure); + future.set(ClusterAdminRpcService.newFailedRpcResultBuilder( + "Failed to activate datacenter.", failure).build()); } }, MoreExecutors.directExecutor()); - return returnFuture; + return future; } + private ListenableFuture> deactivateEosDatacenter( + final DeactivateEosDatacenterInput input) { + LOG.debug("Deactivating EOS Datacenter"); + final SettableFuture> future = SettableFuture.create(); + Futures.addCallback(dataCenterControl.deactivateDataCenter(), new FutureCallback<>() { + @Override + public void onSuccess(final Empty result) { + LOG.debug("Successfully deactivated datacenter."); + future.set(RpcResultBuilder.success().build()); + } - @Override - public ListenableFuture> getKnownClientsForAllShards( - final GetKnownClientsForAllShardsInput input) { - final ImmutableMap> allShardReplies = - getAllShardLeadersClients(); - return Futures.whenAllComplete(allShardReplies.values()).call(() -> processReplies(allShardReplies), - MoreExecutors.directExecutor()); + @Override + public void onFailure(final Throwable failure) { + future.set(ClusterAdminRpcService.newFailedRpcResultBuilder( + "Failed to deactivate datacenter.", failure).build()); + } + }, MoreExecutors.directExecutor()); + + return future; } private static RpcResult processReplies( final ImmutableMap> allShardReplies) { - final List result = new ArrayList<>(allShardReplies.size()); + final Map result = Maps.newHashMapWithExpectedSize(allShardReplies.size()); for (Entry> entry : allShardReplies.entrySet()) { final ListenableFuture future = entry.getValue(); final ShardResultBuilder builder = new ShardResultBuilder() @@ -661,22 +599,28 @@ public class ClusterAdminRpcService implements ClusterAdminService { reply = Futures.getDone(future); } catch (ExecutionException e) { LOG.debug("Shard {} failed to answer", entry.getKey(), e); - result.add(builder.setSucceeded(Boolean.FALSE).setErrorMessage(e.getCause().getMessage()).build()); + final ShardResult sr = builder + .setSucceeded(Boolean.FALSE) + .setErrorMessage(e.getCause().getMessage()) + .build(); + result.put(sr.key(), sr); continue; } - result.add(builder - .setSucceeded(Boolean.TRUE) - .addAugmentation(ShardResult1.class, new ShardResult1Builder() - .setKnownClients(reply.getClients().stream() - .map(client -> new KnownClientsBuilder() - .setMember(client.getFrontendId().getMemberName().toYang()) - .setType(client.getFrontendId().getClientType().toYang()) - .setGeneration(client.getYangGeneration()) - .build()) - .collect(Collectors.toList())) - .build()) - .build()); + final ShardResult sr = builder + .setSucceeded(Boolean.TRUE) + .addAugmentation(new ShardResult1Builder() + .setKnownClients(reply.getClients().stream() + .map(client -> new KnownClientsBuilder() + .setMember(client.getFrontendId().getMemberName().toYang()) + .setType(client.getFrontendId().getClientType().toYang()) + .setGeneration(client.getYangGeneration()) + .build()) + .collect(Collectors.toMap(KnownClients::key, Function.identity()))) + .build()) + .build(); + + result.put(sr.key(), sr); } return RpcResultBuilder.success(new GetKnownClientsForAllShardsOutputBuilder().setShardResult(result).build()) @@ -687,26 +631,27 @@ public class ClusterAdminRpcService implements ClusterAdminService { final List memberVotingStatus) { Map serverVotingStatusMap = new HashMap<>(); for (MemberVotingState memberStatus: memberVotingStatus) { - serverVotingStatusMap.put(memberStatus.getMemberName(), memberStatus.isVoting()); + serverVotingStatusMap.put(memberStatus.getMemberName(), memberStatus.getVoting()); } return new ChangeShardMembersVotingStatus(shardName, serverVotingStatusMap); } private static SettableFuture> waitForShardResults( final List, ShardResultBuilder>> shardResultData, - final Function, T> resultDataSupplier, + final Function, T> resultDataSupplier, final String failureLogMsgPrefix) { final SettableFuture> returnFuture = SettableFuture.create(); - final List shardResults = new ArrayList<>(); + final Map shardResults = new HashMap<>(); for (final Entry, ShardResultBuilder> entry : shardResultData) { Futures.addCallback(entry.getKey(), new FutureCallback() { @Override public void onSuccess(final Success result) { synchronized (shardResults) { - ShardResultBuilder shardResult = entry.getValue(); - LOG.debug("onSuccess for shard {}, type {}", shardResult.getShardName(), - shardResult.getDataStoreType()); - shardResults.add(shardResult.setSucceeded(true).build()); + final ShardResultBuilder builder = entry.getValue(); + LOG.debug("onSuccess for shard {}, type {}", builder.getShardName(), + builder.getDataStoreType()); + final ShardResult sr = builder.setSucceeded(Boolean.TRUE).build(); + shardResults.put(sr.key(), sr); checkIfComplete(); } } @@ -714,11 +659,14 @@ public class ClusterAdminRpcService implements ClusterAdminService { @Override public void onFailure(final Throwable failure) { synchronized (shardResults) { - ShardResultBuilder shardResult = entry.getValue(); - LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, shardResult.getShardName(), - shardResult.getDataStoreType(), failure); - shardResults.add(shardResult.setSucceeded(false).setErrorMessage( - Throwables.getRootCause(failure).getMessage()).build()); + ShardResultBuilder builder = entry.getValue(); + LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, builder.getShardName(), + builder.getDataStoreType(), failure); + final ShardResult sr = builder + .setSucceeded(Boolean.FALSE) + .setErrorMessage(Throwables.getRootCause(failure).getMessage()) + .build(); + shardResults.put(sr.key(), sr); checkIfComplete(); } } @@ -766,8 +714,6 @@ public class ClusterAdminRpcService implements ClusterAdminService { return ask(shardManager, message, SHARD_MGR_TIMEOUT); } - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") @SuppressWarnings("checkstyle:IllegalCatch") private static void saveSnapshotsToFile(final DatastoreSnapshotList snapshots, final String fileName, final SettableFuture> returnFuture) {