From: Robert Varga Date: Thu, 30 Jan 2025 03:41:06 +0000 (+0100) Subject: Simplify sendMessageToManagerForConfiguredShards() X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=4159a0da5485f892d52a88018b2563c73fa9b5ea;p=controller.git Simplify sendMessageToManagerForConfiguredShards() We are always sending messages to all datastores, so this refactors the basic primitive and its users to be much more friendly. Change-Id: Ia8bd477808d40f74c2faa791b84ada650cf27d27 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java index 606ede6496..b5b074ff65 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.datastore.admin; +import static org.apache.pekko.dispatch.Futures.promise; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.base.Throwables; @@ -308,7 +310,7 @@ public final class ClusterAdminRpcService { LOG.info("Moving leader to local node {} for shard {}, datastoreType {}", actorUtils.getCurrentMemberName().getName(), shardName, dataStoreType); - final var makeLeaderLocalAsk = org.apache.pekko.dispatch.Futures.promise(); + final var makeLeaderLocalAsk = promise(); actorUtils.findLocalShardAsync(shardName).onComplete(new OnComplete() { @Override public void onComplete(final Throwable failure, final ActorRef actorRef) { @@ -345,36 +347,26 @@ public final class ClusterAdminRpcService { @VisibleForTesting ListenableFuture> addReplicasForAllShards( final AddReplicasForAllShardsInput input) { LOG.info("Adding replicas for all shards"); - - final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); - - sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, AddShardReplica::new); - sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, AddShardReplica::new); - - return waitForShardResults(shardResultData, shardResults -> - new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build(), - "Failed to add replica"); + return waitForShardResults( + sendMessageToManagerForConfiguredShards(AddShardReplica::new), + result -> new AddReplicasForAllShardsOutputBuilder().setShardResult(result).build(), + "Failed to add replica"); } @VisibleForTesting ListenableFuture> removeAllShardReplicas( final RemoveAllShardReplicasInput input) { LOG.info("Removing replicas for all shards"); - final String memberName = input.getMemberName(); + final var memberName = input.getMemberName(); if (Strings.isNullOrEmpty(memberName)) { return newFailedRpcResultFuture("A valid member name must be specified"); } - final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); - Function messageSupplier = shardName -> - new RemoveShardReplica(shardName, MemberName.forName(memberName)); - - sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier); - sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier); - - return waitForShardResults(shardResultData, - shardResults -> new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build(), - " Failed to remove replica"); + return waitForShardResults( + sendMessageToManagerForConfiguredShards( + shardName -> new RemoveShardReplica(shardName, MemberName.forName(memberName))), + result -> new RemoveAllShardReplicasOutputBuilder().setShardResult(result).build(), + "Failed to remove replica"); } @VisibleForTesting @@ -421,39 +413,27 @@ public final class ClusterAdminRpcService { @VisibleForTesting ListenableFuture> changeMemberVotingStatesForAllShards( final ChangeMemberVotingStatesForAllShardsInput input) { - List memberVotingStates = input.getMemberVotingState(); + final var memberVotingStates = input.getMemberVotingState(); if (memberVotingStates == null || memberVotingStates.isEmpty()) { return newFailedRpcResultFuture("No member voting state input was specified"); } - final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); - Function messageSupplier = shardName -> - toChangeShardMembersVotingStatus(shardName, memberVotingStates); - LOG.info("Change member voting states for all shards"); - - sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier); - sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier); - - return waitForShardResults(shardResultData, shardResults -> - new ChangeMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(), - "Failed to change member voting states"); + return waitForShardResults( + sendMessageToManagerForConfiguredShards( + shardName -> toChangeShardMembersVotingStatus(shardName, memberVotingStates)), + result -> new ChangeMemberVotingStatesForAllShardsOutputBuilder().setShardResult(result).build(), + "Failed to change member voting states"); } @VisibleForTesting ListenableFuture> flipMemberVotingStatesForAllShards( final FlipMemberVotingStatesForAllShardsInput input) { - final var shardResultData = new ArrayList, ShardResultBuilder>>(); - final Function messageSupplier = FlipShardMembersVotingStatus::new; - LOG.info("Flip member voting states for all shards"); - - sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier); - sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier); - - return waitForShardResults(shardResultData, shardResults -> - new FlipMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(), - "Failed to change member voting states"); + return waitForShardResults( + sendMessageToManagerForConfiguredShards(FlipShardMembersVotingStatus::new), + result -> new FlipMemberVotingStatesForAllShardsOutputBuilder().setShardResult(result).build(), + "Failed to change member voting states"); } private ListenableFuture> getShardRole(final GetShardRoleInput input) { @@ -695,19 +675,24 @@ public final class ClusterAdminRpcService { return ret; } - private void sendMessageToManagerForConfiguredShards(final DataStoreType dataStoreType, - final List, ShardResultBuilder>> shardResultData, + private ArrayList, ShardResultBuilder>> sendMessageToManagerForConfiguredShards( final Function messageSupplier) { - final var actorUtils = actorUtils(dataStoreType); - final var allShardNames = actorUtils.getConfiguration().getAllShardNames(); + final var ret = new ArrayList, ShardResultBuilder>>(); - LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorUtils.getDataStoreName()); + for (var dataStoreType : DataStoreType.values()) { + final var utils = actorUtils(dataStoreType); + final var allShardNames = utils.getConfiguration().getAllShardNames(); - for (var shardName: allShardNames) { - shardResultData.add(Map.entry( - this.ask(actorUtils.getShardManager(), messageSupplier.apply(shardName), SHARD_MGR_TIMEOUT), - new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType))); + LOG.debug("Sending message to all shards {} for data store {}", allShardNames, utils.getDataStoreName()); + + for (var shardName: allShardNames) { + ret.add(Map.entry( + ask(utils.getShardManager(), messageSupplier.apply(shardName), SHARD_MGR_TIMEOUT), + new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType))); + } } + + return ret; } private ListenableFuture sendMessageToShardManager(final DataStoreType dataStoreType, final Object message) {