*/
package org.opendaylight.controller.cluster.datastore.admin;
+import static org.apache.pekko.dispatch.Futures.promise;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
LOG.info("Moving leader to local node {} for shard {}, datastoreType {}",
actorUtils.getCurrentMemberName().getName(), shardName, dataStoreType);
- final var makeLeaderLocalAsk = org.apache.pekko.dispatch.Futures.<Object>promise();
+ final var makeLeaderLocalAsk = promise();
actorUtils.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
@Override
public void onComplete(final Throwable failure, final ActorRef actorRef) {
@VisibleForTesting ListenableFuture<RpcResult<AddReplicasForAllShardsOutput>> addReplicasForAllShards(
final AddReplicasForAllShardsInput input) {
LOG.info("Adding replicas for all shards");
-
- final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
-
- sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, AddShardReplica::new);
- sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, AddShardReplica::new);
-
- return waitForShardResults(shardResultData, shardResults ->
- new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build(),
- "Failed to add replica");
+ return waitForShardResults(
+ sendMessageToManagerForConfiguredShards(AddShardReplica::new),
+ result -> new AddReplicasForAllShardsOutputBuilder().setShardResult(result).build(),
+ "Failed to add replica");
}
@VisibleForTesting ListenableFuture<RpcResult<RemoveAllShardReplicasOutput>> removeAllShardReplicas(
final RemoveAllShardReplicasInput input) {
LOG.info("Removing replicas for all shards");
- final String memberName = input.getMemberName();
+ final var memberName = input.getMemberName();
if (Strings.isNullOrEmpty(memberName)) {
return newFailedRpcResultFuture("A valid member name must be specified");
}
- final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
- Function<String, Object> messageSupplier = shardName ->
- new RemoveShardReplica(shardName, MemberName.forName(memberName));
-
- sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
- sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
-
- return waitForShardResults(shardResultData,
- shardResults -> new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build(),
- " Failed to remove replica");
+ return waitForShardResults(
+ sendMessageToManagerForConfiguredShards(
+ shardName -> new RemoveShardReplica(shardName, MemberName.forName(memberName))),
+ result -> new RemoveAllShardReplicasOutputBuilder().setShardResult(result).build(),
+ "Failed to remove replica");
}
@VisibleForTesting
@VisibleForTesting
ListenableFuture<RpcResult<ChangeMemberVotingStatesForAllShardsOutput>> changeMemberVotingStatesForAllShards(
final ChangeMemberVotingStatesForAllShardsInput input) {
- List<MemberVotingState> memberVotingStates = input.getMemberVotingState();
+ final var memberVotingStates = input.getMemberVotingState();
if (memberVotingStates == null || memberVotingStates.isEmpty()) {
return newFailedRpcResultFuture("No member voting state input was specified");
}
- final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
- Function<String, Object> messageSupplier = shardName ->
- toChangeShardMembersVotingStatus(shardName, memberVotingStates);
-
LOG.info("Change member voting states for all shards");
-
- sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
- sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
-
- return waitForShardResults(shardResultData, shardResults ->
- new ChangeMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(),
- "Failed to change member voting states");
+ return waitForShardResults(
+ sendMessageToManagerForConfiguredShards(
+ shardName -> toChangeShardMembersVotingStatus(shardName, memberVotingStates)),
+ result -> new ChangeMemberVotingStatesForAllShardsOutputBuilder().setShardResult(result).build(),
+ "Failed to change member voting states");
}
@VisibleForTesting
ListenableFuture<RpcResult<FlipMemberVotingStatesForAllShardsOutput>> flipMemberVotingStatesForAllShards(
final FlipMemberVotingStatesForAllShardsInput input) {
- final var shardResultData = new ArrayList<Entry<ListenableFuture<Success>, ShardResultBuilder>>();
- final Function<String, Object> messageSupplier = FlipShardMembersVotingStatus::new;
-
LOG.info("Flip member voting states for all shards");
-
- sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
- sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
-
- return waitForShardResults(shardResultData, shardResults ->
- new FlipMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(),
- "Failed to change member voting states");
+ return waitForShardResults(
+ sendMessageToManagerForConfiguredShards(FlipShardMembersVotingStatus::new),
+ result -> new FlipMemberVotingStatesForAllShardsOutputBuilder().setShardResult(result).build(),
+ "Failed to change member voting states");
}
private ListenableFuture<RpcResult<GetShardRoleOutput>> getShardRole(final GetShardRoleInput input) {
return ret;
}
- private <T> void sendMessageToManagerForConfiguredShards(final DataStoreType dataStoreType,
- final List<Entry<ListenableFuture<T>, ShardResultBuilder>> shardResultData,
+ private ArrayList<Entry<ListenableFuture<Success>, ShardResultBuilder>> sendMessageToManagerForConfiguredShards(
final Function<String, Object> messageSupplier) {
- final var actorUtils = actorUtils(dataStoreType);
- final var allShardNames = actorUtils.getConfiguration().getAllShardNames();
+ final var ret = new ArrayList<Entry<ListenableFuture<Success>, ShardResultBuilder>>();
- LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorUtils.getDataStoreName());
+ for (var dataStoreType : DataStoreType.values()) {
+ final var utils = actorUtils(dataStoreType);
+ final var allShardNames = utils.getConfiguration().getAllShardNames();
- for (var shardName: allShardNames) {
- shardResultData.add(Map.entry(
- this.<T>ask(actorUtils.getShardManager(), messageSupplier.apply(shardName), SHARD_MGR_TIMEOUT),
- new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType)));
+ LOG.debug("Sending message to all shards {} for data store {}", allShardNames, utils.getDataStoreName());
+
+ for (var shardName: allShardNames) {
+ ret.add(Map.entry(
+ ask(utils.getShardManager(), messageSupplier.apply(shardName), SHARD_MGR_TIMEOUT),
+ new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType)));
+ }
}
+
+ return ret;
}
private <T> ListenableFuture<T> sendMessageToShardManager(final DataStoreType dataStoreType, final Object message) {