- public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
-
- Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
- "Read operation on write-only transaction is not allowed");
-
- LOG.debug("Tx {} read {}", getIdentifier(), path);
-
- final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
-
- if(isRootPath(path)){
- readAllData(path, proxyFuture);
- } else {
- throttleOperation();
-
- TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
- txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
- @Override
- public void invoke(TransactionContext transactionContext) {
- transactionContext.readData(path, proxyFuture);
- }
- });
-
- }
-
- return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
- }
-
- private void readAllData(final YangInstanceIdentifier path,
- final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
- Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
- List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
-
- for(String shardName : allShardNames){
- final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
-
- throttleOperation();
-
- TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
- txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
- @Override
- public void invoke(TransactionContext transactionContext) {
- transactionContext.readData(path, subProxyFuture);
- }
- });
-
- futures.add(subProxyFuture);
- }
-
- final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
-
- future.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
- future.get(), actorContext.getSchemaContext()));
- } catch (DataValidationFailedException | InterruptedException | ExecutionException e) {
- proxyFuture.setException(e);
- }
- }
- }, actorContext.getActorSystem().dispatcher());