From: Robert Varga Date: Wed, 19 Apr 2017 12:54:55 +0000 (+0200) Subject: BUG-5581: batch read check X-Git-Tag: release/carbon~19^2~2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=85b7c8994203dd04c380f1917f2c5d2194d89e36;p=netconf.git BUG-5581: batch read check When posting data we are executing potentially a lot of reads, which can hurt if the backend is not residing on the node which is executing the operations. Fix this by issuing read requests in a batch and then collecting the results, which means we will amortize request latency. Change-Id: Ibf5628f6fdf56b7845b4a4623255ff4d417c4546 Signed-off-by: Robert Varga --- diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/BatchedExistenceCheck.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/BatchedExistenceCheck.java new file mode 100644 index 0000000000..89337cc99f --- /dev/null +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/BatchedExistenceCheck.java @@ -0,0 +1,90 @@ +/** + * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.restconf.impl; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.Collection; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +final class BatchedExistenceCheck { + private static final AtomicIntegerFieldUpdater UPDATER = AtomicIntegerFieldUpdater.newUpdater( + BatchedExistenceCheck.class, "outstanding"); + + private final SettableFuture> future = SettableFuture.create(); + + @SuppressWarnings("unused") + private volatile int outstanding; + + private BatchedExistenceCheck(final int total) { + this.outstanding = total; + } + + static BatchedExistenceCheck start(final DOMDataReadTransaction readTx, + final LogicalDatastoreType datastore, final YangInstanceIdentifier parentPath, + final Collection> children) { + final BatchedExistenceCheck ret = new BatchedExistenceCheck(children.size()); + for (NormalizedNode child : children) { + final YangInstanceIdentifier path = parentPath.node(child.getIdentifier()); + final ListenableFuture f = readTx.exists(datastore, path); + Futures.addCallback(f, new FutureCallback() { + @Override + public void onSuccess(final Boolean result) { + ret.complete(path, result.booleanValue()); + } + + @Override + public void onFailure(final Throwable t) { + final Exception e; + if (t instanceof Exception) { + e = (Exception) t; + } else { + e = new ExecutionException(t); + } + + ret.complete(path, ReadFailedException.MAPPER.apply(e)); + } + }); + } + + return ret; + } + + Entry getFailure() throws InterruptedException { + try { + return future.get(); + } catch (ExecutionException e) { + // This should never happen + throw new IllegalStateException(e); + } + } + + private void complete(final YangInstanceIdentifier childPath, final boolean present) { + final int count = UPDATER.decrementAndGet(this); + if (present) { + future.set(new SimpleImmutableEntry<>(childPath, null)); + } else if (count == 0) { + future.set(null); + } + } + + private void complete(final YangInstanceIdentifier childPath, final ReadFailedException cause) { + UPDATER.decrementAndGet(this); + future.set(new SimpleImmutableEntry<>(childPath, cause)); + } +} diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/BrokerFacade.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/BrokerFacade.java index 5ff0cf07b2..07e5e92ef4 100644 --- a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/BrokerFacade.java +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/BrokerFacade.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; import javax.annotation.Nullable; import javax.ws.rs.core.Response.Status; @@ -885,18 +886,46 @@ public class BrokerFacade { return; } - // We are putting multiple children, we really need a create() operation, but until we have that we make do - // with a two-step process of verifying if the children exist and then putting them in. - for (final NormalizedNode child : children) { - checkItemDoesNotExists(rWTransaction, datastore, path.node(child.getIdentifier())); + final NormalizedNode emptySubtree = ImmutableNodes.fromInstanceId(schemaContext, path); + if (children.isEmpty()) { + rWTransaction.merge(datastore, YangInstanceIdentifier.create(emptySubtree.getIdentifier()), emptySubtree); + ensureParentsByMerge(datastore, path, rWTransaction, schemaContext); + return; } - final NormalizedNode emptySubtree = ImmutableNodes.fromInstanceId(schemaContext, path); + // Kick off batch existence check first... + final BatchedExistenceCheck check = BatchedExistenceCheck.start(rWTransaction, datastore, path, children); + + // ... now enqueue modifications. This relies on proper ordering of requests, i.e. these will not affect the + // result of the existence checks... rWTransaction.merge(datastore, YangInstanceIdentifier.create(emptySubtree.getIdentifier()), emptySubtree); ensureParentsByMerge(datastore, path, rWTransaction, schemaContext); for (final NormalizedNode child : children) { + // FIXME: we really want a create(YangInstanceIdentifier, NormalizedNode) method in the transaction, + // as that would allow us to skip the existence checks rWTransaction.put(datastore, path.node(child.getIdentifier()), child); } + + // ... finally collect existence checks and abort the transaction if any of them failed. + final Entry failure; + try { + failure = check.getFailure(); + } catch (InterruptedException e) { + rWTransaction.cancel(); + throw new RestconfDocumentedException("Could not determine the existence of path " + path, e); + } + + if (failure != null) { + rWTransaction.cancel(); + final ReadFailedException e = failure.getValue(); + if (e == null) { + throw new RestconfDocumentedException("Data already exists for path: " + failure.getKey(), + ErrorType.PROTOCOL, ErrorTag.DATA_EXISTS); + } + + throw new RestconfDocumentedException("Could not determine the existence of path " + failure.getKey(), e, + e.getErrorList()); + } } private static void simplePostPut(final DOMDataReadWriteTransaction rWTransaction, @@ -1196,18 +1225,6 @@ public class BrokerFacade { listener.setRegistration(registration); } - private final class PATCHStatusContextHelper { - PATCHStatusContext status; - - public PATCHStatusContext getStatus() { - return this.status; - } - - public void setStatus(final PATCHStatusContext status) { - this.status = status; - } - } - private static void ensureParentsByMerge(final LogicalDatastoreType store, final YangInstanceIdentifier normalizedPath, final DOMDataReadWriteTransaction rwTx, final SchemaContext schemaContext) { @@ -1237,4 +1254,16 @@ public class BrokerFacade { YangInstanceIdentifier.create(normalizedPathWithoutChildArgs)); rwTx.merge(store, rootNormalizedPath, parentStructure); } + + private static final class PATCHStatusContextHelper { + PATCHStatusContext status; + + public PATCHStatusContext getStatus() { + return this.status; + } + + public void setStatus(final PATCHStatusContext status) { + this.status = status; + } + } }