Simplify sendMessageToManagerForConfiguredShards() 21/115021/2
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 30 Jan 2025 03:41:06 +0000 (04:41 +0100)
committerRobert Varga <nite@hq.sk>
Fri, 31 Jan 2025 06:49:36 +0000 (06:49 +0000)
We are always sending messages to all datastores, so this refactors
the basic primitive and its users to be much more friendly.

Change-Id: Ia8bd477808d40f74c2faa791b84ada650cf27d27
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java

index 606ede6496ab0be3d9ea23bd2883e88e715e1eaf..b5b074ff653608a2ebc6ab73068c11fe25c7dcdf 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore.admin;
 
+import static org.apache.pekko.dispatch.Futures.promise;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
@@ -308,7 +310,7 @@ public final class ClusterAdminRpcService {
         LOG.info("Moving leader to local node {} for shard {}, datastoreType {}",
                 actorUtils.getCurrentMemberName().getName(), shardName, dataStoreType);
 
-        final var makeLeaderLocalAsk = org.apache.pekko.dispatch.Futures.<Object>promise();
+        final var makeLeaderLocalAsk = promise();
         actorUtils.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
             @Override
             public void onComplete(final Throwable failure, final ActorRef actorRef) {
@@ -345,36 +347,26 @@ public final class ClusterAdminRpcService {
     @VisibleForTesting ListenableFuture<RpcResult<AddReplicasForAllShardsOutput>> addReplicasForAllShards(
             final AddReplicasForAllShardsInput input) {
         LOG.info("Adding replicas for all shards");
-
-        final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
-
-        sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, AddShardReplica::new);
-        sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, AddShardReplica::new);
-
-        return waitForShardResults(shardResultData, shardResults ->
-                new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build(),
-                "Failed to add replica");
+        return waitForShardResults(
+            sendMessageToManagerForConfiguredShards(AddShardReplica::new),
+            result -> new AddReplicasForAllShardsOutputBuilder().setShardResult(result).build(),
+            "Failed to add replica");
     }
 
     @VisibleForTesting ListenableFuture<RpcResult<RemoveAllShardReplicasOutput>> removeAllShardReplicas(
             final RemoveAllShardReplicasInput input) {
         LOG.info("Removing replicas for all shards");
 
-        final String memberName = input.getMemberName();
+        final var memberName = input.getMemberName();
         if (Strings.isNullOrEmpty(memberName)) {
             return newFailedRpcResultFuture("A valid member name must be specified");
         }
 
-        final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
-        Function<String, Object> messageSupplier = shardName ->
-                new RemoveShardReplica(shardName, MemberName.forName(memberName));
-
-        sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
-        sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
-
-        return waitForShardResults(shardResultData,
-            shardResults -> new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build(),
-            "       Failed to remove replica");
+        return waitForShardResults(
+            sendMessageToManagerForConfiguredShards(
+                shardName -> new RemoveShardReplica(shardName, MemberName.forName(memberName))),
+            result -> new RemoveAllShardReplicasOutputBuilder().setShardResult(result).build(),
+            "Failed to remove replica");
     }
 
     @VisibleForTesting
@@ -421,39 +413,27 @@ public final class ClusterAdminRpcService {
     @VisibleForTesting
     ListenableFuture<RpcResult<ChangeMemberVotingStatesForAllShardsOutput>> changeMemberVotingStatesForAllShards(
             final ChangeMemberVotingStatesForAllShardsInput input) {
-        List<MemberVotingState> memberVotingStates = input.getMemberVotingState();
+        final var memberVotingStates = input.getMemberVotingState();
         if (memberVotingStates == null || memberVotingStates.isEmpty()) {
             return newFailedRpcResultFuture("No member voting state input was specified");
         }
 
-        final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
-        Function<String, Object> messageSupplier = shardName ->
-                toChangeShardMembersVotingStatus(shardName, memberVotingStates);
-
         LOG.info("Change member voting states for all shards");
-
-        sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
-        sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
-
-        return waitForShardResults(shardResultData, shardResults ->
-                new ChangeMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(),
-                "Failed to change member voting states");
+        return waitForShardResults(
+            sendMessageToManagerForConfiguredShards(
+                shardName -> toChangeShardMembersVotingStatus(shardName, memberVotingStates)),
+            result -> new ChangeMemberVotingStatesForAllShardsOutputBuilder().setShardResult(result).build(),
+            "Failed to change member voting states");
     }
 
     @VisibleForTesting
     ListenableFuture<RpcResult<FlipMemberVotingStatesForAllShardsOutput>> flipMemberVotingStatesForAllShards(
             final FlipMemberVotingStatesForAllShardsInput input) {
-        final var shardResultData = new ArrayList<Entry<ListenableFuture<Success>, ShardResultBuilder>>();
-        final Function<String, Object> messageSupplier = FlipShardMembersVotingStatus::new;
-
         LOG.info("Flip member voting states for all shards");
-
-        sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
-        sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
-
-        return waitForShardResults(shardResultData, shardResults ->
-                new FlipMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(),
-                "Failed to change member voting states");
+        return waitForShardResults(
+            sendMessageToManagerForConfiguredShards(FlipShardMembersVotingStatus::new),
+            result -> new FlipMemberVotingStatesForAllShardsOutputBuilder().setShardResult(result).build(),
+            "Failed to change member voting states");
     }
 
     private ListenableFuture<RpcResult<GetShardRoleOutput>> getShardRole(final GetShardRoleInput input) {
@@ -695,19 +675,24 @@ public final class ClusterAdminRpcService {
         return ret;
     }
 
-    private <T> void sendMessageToManagerForConfiguredShards(final DataStoreType dataStoreType,
-            final List<Entry<ListenableFuture<T>, ShardResultBuilder>> shardResultData,
+    private ArrayList<Entry<ListenableFuture<Success>, ShardResultBuilder>> sendMessageToManagerForConfiguredShards(
             final Function<String, Object> messageSupplier) {
-        final var actorUtils = actorUtils(dataStoreType);
-        final var allShardNames = actorUtils.getConfiguration().getAllShardNames();
+        final var ret = new ArrayList<Entry<ListenableFuture<Success>, ShardResultBuilder>>();
 
-        LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorUtils.getDataStoreName());
+        for (var dataStoreType : DataStoreType.values()) {
+            final var utils = actorUtils(dataStoreType);
+            final var allShardNames = utils.getConfiguration().getAllShardNames();
 
-        for (var shardName: allShardNames) {
-            shardResultData.add(Map.entry(
-                this.<T>ask(actorUtils.getShardManager(), messageSupplier.apply(shardName), SHARD_MGR_TIMEOUT),
-                new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType)));
+            LOG.debug("Sending message to all shards {} for data store {}", allShardNames, utils.getDataStoreName());
+
+            for (var shardName: allShardNames) {
+                ret.add(Map.entry(
+                    ask(utils.getShardManager(), messageSupplier.apply(shardName), SHARD_MGR_TIMEOUT),
+                    new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType)));
+            }
         }
+
+        return ret;
     }
 
     private <T> ListenableFuture<T> sendMessageToShardManager(final DataStoreType dataStoreType, final Object message) {