X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=388dd9f4bda2fee3e82bba4660a137876cf9dbda;hb=9f1061c46af5220ad95d8d0b94411ba2fd832a50;hp=59c9298499c4ed0a58961b57fe40019f15214de1;hpb=e1a6ed792b504c2978c5259f926eaa09257c694c;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 59c9298499..388dd9f4bd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -16,12 +16,16 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; @@ -153,19 +158,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction> getRecordedOperationFutures() { - List> recordedOperationFutures = Lists.newArrayList(); - for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - if (transactionContext != null) { - transactionContext.copyRecordedOperationFutures(recordedOperationFutures); - } - } - - return recordedOperationFutures; - } - @VisibleForTesting boolean hasTransactionContext() { for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { @@ -178,6 +170,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction>, ReadFailedException> read(final YangInstanceIdentifier path) { @@ -186,21 +182,62 @@ public class TransactionProxy extends AbstractDOMStoreTransaction>> proxyFuture = SettableFuture.create(); - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.readData(path, proxyFuture); - } - }); + if(isRootPath(path)){ + readAllData(path, proxyFuture); + } else { + throttleOperation(); + + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + transactionContext.readData(path, proxyFuture); + } + }); + + } return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } + private void readAllData(final YangInstanceIdentifier path, + final SettableFuture>> proxyFuture) { + Set allShardNames = actorContext.getConfiguration().getAllShardNames(); + List>>> futures = new ArrayList<>(allShardNames.size()); + + for(String shardName : allShardNames){ + final SettableFuture>> subProxyFuture = SettableFuture.create(); + + throttleOperation(); + + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName); + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + transactionContext.readData(path, subProxyFuture); + } + }); + + futures.add(subProxyFuture); + } + + final ListenableFuture>>> future = Futures.allAsList(futures); + + future.addListener(new Runnable() { + @Override + public void run() { + try { + proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(), + future.get(), actorContext.getSchemaContext())); + } catch (InterruptedException | ExecutionException e) { + proxyFuture.setException(e); + } + } + }, actorContext.getActorSystem().dispatcher()); + } + @Override public CheckedFuture exists(final YangInstanceIdentifier path) { @@ -409,6 +446,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction findPrimaryFuture = sendFindPrimaryShardAsync(shardName); @@ -685,10 +726,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction