BUG-2953 : Unable to read from datastore root with clustering enabled 50/18050/2
authorMoiz Raja <moraja@cisco.com>
Thu, 9 Apr 2015 17:53:55 +0000 (10:53 -0700)
committerMoiz Raja <moraja@cisco.com>
Thu, 9 Apr 2015 20:59:33 +0000 (13:59 -0700)
This patch adds a NormalizedNodeAggregator which makes it possible to read
from the root of the datastore tree.

Writing, merging and checking for the existence of the root path is not
supported yet but we could in a future patches if needed.

Change-Id: I892bdc733ec316f41926d8bbccdbbdc6c31a5c25
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregator.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java [new file with mode: 0644]

index 59c9298499c4ed0a58961b57fe40019f15214de1..5f9cc83618b760705a73a02d1260df5a77ecda03 100644 (file)
@@ -16,12 +16,16 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
@@ -178,6 +183,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return false;
     }
 
+    private boolean isRootPath(YangInstanceIdentifier path){
+        return !path.getPathArguments().iterator().hasNext();
+    }
+
     @Override
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
 
@@ -186,21 +195,62 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} read {}", getIdentifier(), path);
 
-        throttleOperation();
-
         final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
 
-        TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
-            @Override
-            public void invoke(TransactionContext transactionContext) {
-                transactionContext.readData(path, proxyFuture);
-            }
-        });
+        if(isRootPath(path)){
+            readAllData(path, proxyFuture);
+        } else {
+            throttleOperation();
+
+            TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                @Override
+                public void invoke(TransactionContext transactionContext) {
+                    transactionContext.readData(path, proxyFuture);
+                }
+            });
+
+        }
 
         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
     }
 
+    private void readAllData(final YangInstanceIdentifier path,
+                             final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
+        Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
+        List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
+
+        for(String shardName : allShardNames){
+            final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
+
+            throttleOperation();
+
+            TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
+            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                @Override
+                public void invoke(TransactionContext transactionContext) {
+                    transactionContext.readData(path, subProxyFuture);
+                }
+            });
+
+            futures.add(subProxyFuture);
+        }
+
+        final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
+
+        future.addListener(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
+                            future.get(), actorContext.getSchemaContext()));
+                } catch (InterruptedException | ExecutionException e) {
+                    proxyFuture.setException(e);
+                }
+            }
+        }, actorContext.getActorSystem().dispatcher());
+    }
+
     @Override
     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
 
@@ -409,6 +459,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
         String shardName = shardNameFromIdentifier(path);
+        return getOrCreateTxFutureCallback(shardName);
+    }
+
+    private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
         if(txFutureCallback == null) {
             Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
index f53368d8869416cb28340ab272aedcbfab449512..17d988005fadf32c8209837671d43d0aa2981b60 100644 (file)
@@ -540,6 +540,10 @@ public class ActorContext {
         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
     }
 
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+
     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
         return ask(actorRef, message, timeout);
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregator.java
new file mode 100644 (file)
index 0000000..eb13078
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class NormalizedNodeAggregator {
+
+    private static final ExecutorService executorService = MoreExecutors.newDirectExecutorService();
+
+    private final YangInstanceIdentifier rootIdentifier;
+    private final List<Optional<NormalizedNode<?, ?>>> nodes;
+    private final InMemoryDOMDataStore dataStore;
+
+    NormalizedNodeAggregator(YangInstanceIdentifier rootIdentifier, List<Optional<NormalizedNode<?, ?>>> nodes,
+                             SchemaContext schemaContext){
+
+        this.rootIdentifier = rootIdentifier;
+        this.nodes = nodes;
+        this.dataStore = new InMemoryDOMDataStore("aggregator", executorService);
+        this.dataStore.onGlobalContextUpdated(schemaContext);
+    }
+
+    /**
+     * Combine data from all the nodes in the list into a tree with root as rootIdentifier
+     *
+     * @param nodes
+     * @param schemaContext
+     * @return
+     * @throws ExecutionException
+     * @throws InterruptedException
+     */
+    public static Optional<NormalizedNode<?,?>> aggregate(YangInstanceIdentifier rootIdentifier,
+                                                          List<Optional<NormalizedNode<?, ?>>> nodes,
+                                                          SchemaContext schemaContext)
+            throws ExecutionException, InterruptedException {
+        return new NormalizedNodeAggregator(rootIdentifier, nodes, schemaContext).aggregate();
+    }
+
+    private Optional<NormalizedNode<?,?>> aggregate() throws ExecutionException, InterruptedException {
+        return combine().getRootNode();
+    }
+
+    private NormalizedNodeAggregator combine() throws InterruptedException, ExecutionException {
+        DOMStoreWriteTransaction domStoreWriteTransaction = dataStore.newWriteOnlyTransaction();
+
+        for(Optional<NormalizedNode<?,?>> node : nodes) {
+            if(node.isPresent()) {
+                domStoreWriteTransaction.merge(rootIdentifier, node.get());
+            }
+        }
+        DOMStoreThreePhaseCommitCohort ready = domStoreWriteTransaction.ready();
+        ready.canCommit().get();
+        ready.preCommit().get();
+        ready.commit().get();
+
+        return this;
+    }
+
+    private Optional<NormalizedNode<?, ?>> getRootNode() throws InterruptedException, ExecutionException {
+        DOMStoreReadTransaction readTransaction = dataStore.newReadOnlyTransaction();
+
+        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
+                readTransaction.read(rootIdentifier);
+
+        return read.get();
+    }
+
+
+}
index a2471001864c1571a8a52d4abf0b8f41e7571ea3..29a5b09c5c8e33fcb8892054564d4264dd2a711d 100644 (file)
@@ -9,6 +9,7 @@ import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
@@ -20,11 +21,14 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.dispatch.Futures;
 import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Assert;
@@ -45,14 +49,18 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
@@ -1353,4 +1361,79 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
                 BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
     }
+
+    @Test
+    public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
+
+        SchemaContext schemaContext = SchemaContextHelper.full();
+        Configuration configuration = mock(Configuration.class);
+        doReturn(configuration).when(mockActorContext).getConfiguration();
+        doReturn(schemaContext).when(mockActorContext).getSchemaContext();
+        doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
+
+        NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
+
+        setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
+        setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
+
+        doReturn(memberName).when(mockActorContext).getCurrentMemberName();
+
+        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+
+        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
+                YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS);
+
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+
+        NormalizedNode<?, ?> normalizedNode = readOptional.get();
+
+        assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
+
+        Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
+
+        for(NormalizedNode<?,?> node : collection){
+            assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
+        }
+
+        assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
+                NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
+
+        assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
+
+        assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
+                NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
+
+        assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
+    }
+
+
+    private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
+        ActorSystem actorSystem = getSystem();
+        ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(getSystem().actorSelection(shardActorRef.path())).
+                when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+        doReturn(Futures.successful(getSystem().actorSelection(shardActorRef.path()))).
+                when(mockActorContext).findPrimaryShardAsync(eq(shardName));
+
+        doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
+
+        ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(actorSystem.actorSelection(txActorRef.path())).
+                when(mockActorContext).actorSelection(txActorRef.path().toString());
+
+        doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
+                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                        eqCreateTransaction(memberName, TransactionType.READ_ONLY));
+
+        doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java
new file mode 100644 (file)
index 0000000..40d3704
--- /dev/null
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import org.junit.Test;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class NormalizedNodeAggregatorTest {
+
+    @Test
+    public void testAggregate() throws InterruptedException, ExecutionException, ReadFailedException {
+        SchemaContext schemaContext = SchemaContextHelper.full();
+        NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
+
+        Optional<NormalizedNode<?, ?>> optional = NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
+                Lists.newArrayList(
+                        Optional.<NormalizedNode<?, ?>>of(getRootNode(expectedNode1, schemaContext)),
+                        Optional.<NormalizedNode<?, ?>>of(getRootNode(expectedNode2, schemaContext))),
+                schemaContext);
+
+
+        NormalizedNode<?,?> normalizedNode = optional.get();
+
+        assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
+
+        Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
+
+        for(NormalizedNode<?,?> node : collection){
+            assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
+        }
+
+        assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
+                findChildWithQName(collection, TestModel.TEST_QNAME) != null);
+
+        assertEquals(expectedNode1, findChildWithQName(collection, TestModel.TEST_QNAME));
+
+        assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
+                findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
+
+        assertEquals(expectedNode2, findChildWithQName(collection, CarsModel.BASE_QNAME));
+
+    }
+
+    public static NormalizedNode<?,?> getRootNode(NormalizedNode<?, ?> moduleNode, SchemaContext schemaContext) throws ReadFailedException, ExecutionException, InterruptedException {
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", Executors.newSingleThreadExecutor());
+        store.onGlobalContextUpdated(schemaContext);
+
+        DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
+
+        writeTransaction.merge(YangInstanceIdentifier.builder().node(moduleNode.getNodeType()).build(), moduleNode);
+
+        DOMStoreThreePhaseCommitCohort ready = writeTransaction.ready();
+
+        ready.canCommit().get();
+        ready.preCommit().get();
+        ready.commit().get();
+
+        DOMStoreReadTransaction readTransaction = store.newReadOnlyTransaction();
+
+        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = readTransaction.read(YangInstanceIdentifier.builder().build());
+
+        Optional<NormalizedNode<?, ?>> nodeOptional = read.checkedGet();
+
+        return nodeOptional.get();
+    }
+
+    public static NormalizedNode<?,?> findChildWithQName(Collection<NormalizedNode<?, ?>> collection, QName qName) {
+        for(NormalizedNode<?,?> node : collection){
+            if(node.getNodeType().equals(qName)){
+                return node;
+            }
+        }
+
+        return null;
+    }
+
+}
\ No newline at end of file