+ private void readAllData(final YangInstanceIdentifier path,
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
+ Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
+ List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
+
+ for(String shardName : allShardNames){
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
+
+ throttleOperation();
+
+ TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
+ txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ transactionContext.readData(path, subProxyFuture);
+ }
+ });
+
+ futures.add(subProxyFuture);
+ }
+
+ final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
+
+ future.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
+ future.get(), actorContext.getSchemaContext()));
+ } catch (InterruptedException | ExecutionException e) {
+ proxyFuture.setException(e);
+ }
+ }
+ }, actorContext.getActorSystem().dispatcher());
+ }
+