From: Moiz Raja Date: Thu, 9 Apr 2015 17:53:55 +0000 (-0700) Subject: BUG-2953 : Unable to read from datastore root with clustering enabled X-Git-Tag: release/lithium~294^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=00b5c1a7cc2c4a32ec7335bf852565cb0c90e7c4 BUG-2953 : Unable to read from datastore root with clustering enabled This patch adds a NormalizedNodeAggregator which makes it possible to read from the root of the datastore tree. Writing, merging and checking for the existence of the root path is not supported yet but we could in a future patches if needed. Change-Id: I892bdc733ec316f41926d8bbccdbbdc6c31a5c25 Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 59c9298499..5f9cc83618 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -16,12 +16,16 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; @@ -178,6 +183,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction>, ReadFailedException> read(final YangInstanceIdentifier path) { @@ -186,21 +195,62 @@ public class TransactionProxy extends AbstractDOMStoreTransaction>> proxyFuture = SettableFuture.create(); - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.readData(path, proxyFuture); - } - }); + if(isRootPath(path)){ + readAllData(path, proxyFuture); + } else { + throttleOperation(); + + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + transactionContext.readData(path, proxyFuture); + } + }); + + } return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } + private void readAllData(final YangInstanceIdentifier path, + final SettableFuture>> proxyFuture) { + Set allShardNames = actorContext.getConfiguration().getAllShardNames(); + List>>> futures = new ArrayList<>(allShardNames.size()); + + for(String shardName : allShardNames){ + final SettableFuture>> subProxyFuture = SettableFuture.create(); + + throttleOperation(); + + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName); + txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + transactionContext.readData(path, subProxyFuture); + } + }); + + futures.add(subProxyFuture); + } + + final ListenableFuture>>> future = Futures.allAsList(futures); + + future.addListener(new Runnable() { + @Override + public void run() { + try { + proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(), + future.get(), actorContext.getSchemaContext())); + } catch (InterruptedException | ExecutionException e) { + proxyFuture.setException(e); + } + } + }, actorContext.getActorSystem().dispatcher()); + } + @Override public CheckedFuture exists(final YangInstanceIdentifier path) { @@ -409,6 +459,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction findPrimaryFuture = sendFindPrimaryShardAsync(shardName); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index f53368d886..17d988005f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -540,6 +540,10 @@ public class ActorContext { return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification); } + public Configuration getConfiguration() { + return configuration; + } + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout){ return ask(actorRef, message, timeout); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregator.java new file mode 100644 index 0000000000..eb1307897a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregator.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +public class NormalizedNodeAggregator { + + private static final ExecutorService executorService = MoreExecutors.newDirectExecutorService(); + + private final YangInstanceIdentifier rootIdentifier; + private final List>> nodes; + private final InMemoryDOMDataStore dataStore; + + NormalizedNodeAggregator(YangInstanceIdentifier rootIdentifier, List>> nodes, + SchemaContext schemaContext){ + + this.rootIdentifier = rootIdentifier; + this.nodes = nodes; + this.dataStore = new InMemoryDOMDataStore("aggregator", executorService); + this.dataStore.onGlobalContextUpdated(schemaContext); + } + + /** + * Combine data from all the nodes in the list into a tree with root as rootIdentifier + * + * @param nodes + * @param schemaContext + * @return + * @throws ExecutionException + * @throws InterruptedException + */ + public static Optional> aggregate(YangInstanceIdentifier rootIdentifier, + List>> nodes, + SchemaContext schemaContext) + throws ExecutionException, InterruptedException { + return new NormalizedNodeAggregator(rootIdentifier, nodes, schemaContext).aggregate(); + } + + private Optional> aggregate() throws ExecutionException, InterruptedException { + return combine().getRootNode(); + } + + private NormalizedNodeAggregator combine() throws InterruptedException, ExecutionException { + DOMStoreWriteTransaction domStoreWriteTransaction = dataStore.newWriteOnlyTransaction(); + + for(Optional> node : nodes) { + if(node.isPresent()) { + domStoreWriteTransaction.merge(rootIdentifier, node.get()); + } + } + DOMStoreThreePhaseCommitCohort ready = domStoreWriteTransaction.ready(); + ready.canCommit().get(); + ready.preCommit().get(); + ready.commit().get(); + + return this; + } + + private Optional> getRootNode() throws InterruptedException, ExecutionException { + DOMStoreReadTransaction readTransaction = dataStore.newReadOnlyTransaction(); + + CheckedFuture>, ReadFailedException> read = + readTransaction.read(rootIdentifier); + + return read.get(); + } + + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index a247100186..29a5b09c5c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -9,6 +9,7 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; @@ -20,11 +21,14 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.dispatch.Futures; import com.google.common.base.Optional; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; @@ -45,14 +49,18 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; +import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; +import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.Promise; @@ -1353,4 +1361,79 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class); } + + @Test + public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException { + + SchemaContext schemaContext = SchemaContextHelper.full(); + Configuration configuration = mock(Configuration.class); + doReturn(configuration).when(mockActorContext).getConfiguration(); + doReturn(schemaContext).when(mockActorContext).getSchemaContext(); + doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames(); + + NormalizedNode expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + NormalizedNode expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME); + + setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext)); + setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext)); + + doReturn(memberName).when(mockActorContext).getCurrentMemberName(); + + doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); + + doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher(); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); + + Optional> readOptional = transactionProxy.read( + YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS); + + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); + + NormalizedNode normalizedNode = readOptional.get(); + + assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection); + + Collection> collection = (Collection>) normalizedNode.getValue(); + + for(NormalizedNode node : collection){ + assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode); + } + + assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found", + NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null); + + assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME)); + + assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found", + NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null); + + assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME)); + } + + + private void setUpReadData(String shardName, NormalizedNode expectedNode) { + ActorSystem actorSystem = getSystem(); + ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); + + doReturn(getSystem().actorSelection(shardActorRef.path())). + when(mockActorContext).actorSelection(shardActorRef.path().toString()); + + doReturn(Futures.successful(getSystem().actorSelection(shardActorRef.path()))). + when(mockActorContext).findPrimaryShardAsync(eq(shardName)); + + doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString()); + + ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + + doReturn(actorSystem.actorSelection(txActorRef.path())). + when(mockActorContext).actorSelection(txActorRef.path().toString()); + + doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(memberName, TransactionType.READ_ONLY)); + + doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build())); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java new file mode 100644 index 0000000000..40d3704d2c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.CheckedFuture; +import java.util.Collection; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import org.junit.Test; +import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; +import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +public class NormalizedNodeAggregatorTest { + + @Test + public void testAggregate() throws InterruptedException, ExecutionException, ReadFailedException { + SchemaContext schemaContext = SchemaContextHelper.full(); + NormalizedNode expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + NormalizedNode expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME); + + Optional> optional = NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(), + Lists.newArrayList( + Optional.>of(getRootNode(expectedNode1, schemaContext)), + Optional.>of(getRootNode(expectedNode2, schemaContext))), + schemaContext); + + + NormalizedNode normalizedNode = optional.get(); + + assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection); + + Collection> collection = (Collection>) normalizedNode.getValue(); + + for(NormalizedNode node : collection){ + assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode); + } + + assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found", + findChildWithQName(collection, TestModel.TEST_QNAME) != null); + + assertEquals(expectedNode1, findChildWithQName(collection, TestModel.TEST_QNAME)); + + assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found", + findChildWithQName(collection, CarsModel.BASE_QNAME) != null); + + assertEquals(expectedNode2, findChildWithQName(collection, CarsModel.BASE_QNAME)); + + } + + public static NormalizedNode getRootNode(NormalizedNode moduleNode, SchemaContext schemaContext) throws ReadFailedException, ExecutionException, InterruptedException { + InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", Executors.newSingleThreadExecutor()); + store.onGlobalContextUpdated(schemaContext); + + DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction(); + + writeTransaction.merge(YangInstanceIdentifier.builder().node(moduleNode.getNodeType()).build(), moduleNode); + + DOMStoreThreePhaseCommitCohort ready = writeTransaction.ready(); + + ready.canCommit().get(); + ready.preCommit().get(); + ready.commit().get(); + + DOMStoreReadTransaction readTransaction = store.newReadOnlyTransaction(); + + CheckedFuture>, ReadFailedException> read = readTransaction.read(YangInstanceIdentifier.builder().build()); + + Optional> nodeOptional = read.checkedGet(); + + return nodeOptional.get(); + } + + public static NormalizedNode findChildWithQName(Collection> collection, QName qName) { + for(NormalizedNode node : collection){ + if(node.getNodeType().equals(qName)){ + return node; + } + } + + return null; + } + +} \ No newline at end of file