- private FluentFuture<Optional<NormalizedNode<?, ?>>> readAllData() {
- final Set<String> allShardNames = txContextFactory.getActorUtils().getConfiguration().getAllShardNames();
- final Collection<FluentFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
-
- for (String shardName : allShardNames) {
- futures.add(singleShardRead(shardName, YangInstanceIdentifier.empty()));
- }
-
- final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> listFuture = Futures.allAsList(futures);
- final ListenableFuture<Optional<NormalizedNode<?, ?>>> aggregateFuture;
-
- aggregateFuture = Futures.transform(listFuture, input -> {
- try {
- return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.empty(), input,
- txContextFactory.getActorUtils().getSchemaContext(),
- txContextFactory.getActorUtils().getDatastoreContext().getLogicalStoreType());
- } catch (DataValidationFailedException e) {
- throw new IllegalArgumentException("Failed to aggregate", e);
- }
- }, MoreExecutors.directExecutor());
-
- return FluentFuture.from(aggregateFuture);
+ private FluentFuture<Optional<NormalizedNode>> readAllData() {
+ final var actorUtils = getActorUtils();
+ return RootScatterGather.gather(actorUtils, actorUtils.getConfiguration().getAllShardNames().stream()
+ .map(shardName -> singleShardRead(shardName, YangInstanceIdentifier.empty())));