BUG-2953 : Unable to read from datastore root with clustering enabled
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index 59c9298499c4ed0a58961b57fe40019f15214de1..5f9cc83618b760705a73a02d1260df5a77ecda03 100644 (file)
@@ -16,12 +16,16 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
@@ -178,6 +183,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return false;
     }
 
+    private boolean isRootPath(YangInstanceIdentifier path){
+        return !path.getPathArguments().iterator().hasNext();
+    }
+
     @Override
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
 
@@ -186,21 +195,62 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} read {}", getIdentifier(), path);
 
-        throttleOperation();
-
         final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
 
-        TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
-            @Override
-            public void invoke(TransactionContext transactionContext) {
-                transactionContext.readData(path, proxyFuture);
-            }
-        });
+        if(isRootPath(path)){
+            readAllData(path, proxyFuture);
+        } else {
+            throttleOperation();
+
+            TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                @Override
+                public void invoke(TransactionContext transactionContext) {
+                    transactionContext.readData(path, proxyFuture);
+                }
+            });
+
+        }
 
         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
     }
 
+    private void readAllData(final YangInstanceIdentifier path,
+                             final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
+        Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
+        List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
+
+        for(String shardName : allShardNames){
+            final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
+
+            throttleOperation();
+
+            TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
+            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                @Override
+                public void invoke(TransactionContext transactionContext) {
+                    transactionContext.readData(path, subProxyFuture);
+                }
+            });
+
+            futures.add(subProxyFuture);
+        }
+
+        final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
+
+        future.addListener(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
+                            future.get(), actorContext.getSchemaContext()));
+                } catch (InterruptedException | ExecutionException e) {
+                    proxyFuture.setException(e);
+                }
+            }
+        }, actorContext.getActorSystem().dispatcher());
+    }
+
     @Override
     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
 
@@ -409,6 +459,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
         String shardName = shardNameFromIdentifier(path);
+        return getOrCreateTxFutureCallback(shardName);
+    }
+
+    private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
         if(txFutureCallback == null) {
             Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);