BUG-5581: batch read check 24/55224/3
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 19 Apr 2017 12:54:55 +0000 (14:54 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 21 Apr 2017 08:48:50 +0000 (10:48 +0200)
When posting data we are executing potentially a lot of reads,
which can hurt if the backend is not residing on the node which
is executing the operations.

Fix this by issuing read requests in a batch and then collecting
the results, which means we will amortize request latency.

Change-Id: Ibf5628f6fdf56b7845b4a4623255ff4d417c4546
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/BatchedExistenceCheck.java [new file with mode: 0644]
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/BrokerFacade.java

diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/BatchedExistenceCheck.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/BatchedExistenceCheck.java
new file mode 100644 (file)
index 0000000..89337cc
--- /dev/null
@@ -0,0 +1,90 @@
+/**
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.restconf.impl;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.Collection;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+final class BatchedExistenceCheck {
+    private static final AtomicIntegerFieldUpdater<BatchedExistenceCheck> UPDATER = AtomicIntegerFieldUpdater.newUpdater(
+        BatchedExistenceCheck.class, "outstanding");
+
+    private final SettableFuture<Entry<YangInstanceIdentifier, ReadFailedException>> future = SettableFuture.create();
+
+    @SuppressWarnings("unused")
+    private volatile int outstanding;
+
+    private BatchedExistenceCheck(final int total) {
+        this.outstanding = total;
+    }
+
+    static BatchedExistenceCheck start(final DOMDataReadTransaction readTx,
+            final LogicalDatastoreType datastore, final YangInstanceIdentifier parentPath,
+            final Collection<? extends NormalizedNode<?, ?>> children) {
+        final BatchedExistenceCheck ret = new BatchedExistenceCheck(children.size());
+        for (NormalizedNode<?, ?> child : children) {
+            final YangInstanceIdentifier path = parentPath.node(child.getIdentifier());
+            final ListenableFuture<Boolean> f = readTx.exists(datastore, path);
+            Futures.addCallback(f, new FutureCallback<Boolean>() {
+                @Override
+                public void onSuccess(final Boolean result) {
+                    ret.complete(path, result.booleanValue());
+                }
+
+                @Override
+                public void onFailure(final Throwable t) {
+                    final Exception e;
+                    if (t instanceof Exception) {
+                        e = (Exception) t;
+                    } else {
+                        e = new ExecutionException(t);
+                    }
+
+                    ret.complete(path, ReadFailedException.MAPPER.apply(e));
+                }
+            });
+        }
+
+        return ret;
+    }
+
+    Entry<YangInstanceIdentifier, ReadFailedException> getFailure() throws InterruptedException {
+        try {
+            return future.get();
+        } catch (ExecutionException e) {
+            // This should never happen
+            throw new IllegalStateException(e);
+        }
+    }
+
+    private void complete(final YangInstanceIdentifier childPath, final boolean present) {
+        final int count = UPDATER.decrementAndGet(this);
+        if (present) {
+            future.set(new SimpleImmutableEntry<>(childPath, null));
+        } else if (count == 0) {
+            future.set(null);
+        }
+    }
+
+    private void complete(final YangInstanceIdentifier childPath, final ReadFailedException cause) {
+        UPDATER.decrementAndGet(this);
+        future.set(new SimpleImmutableEntry<>(childPath, cause));
+    }
+}
index 5ff0cf07b2db20d37d17187b5372402bfe93d33c..07e5e92ef4dcaa5f45a41aa2d2a9835d35ac42e0 100644 (file)
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
 import javax.annotation.Nullable;
 import javax.ws.rs.core.Response.Status;
@@ -885,18 +886,46 @@ public class BrokerFacade {
             return;
         }
 
-        // We are putting multiple children, we really need a create() operation, but until we have that we make do
-        // with a two-step process of verifying if the children exist and then putting them in.
-        for (final NormalizedNode<?, ?> child : children) {
-            checkItemDoesNotExists(rWTransaction, datastore, path.node(child.getIdentifier()));
+        final NormalizedNode<?, ?> emptySubtree = ImmutableNodes.fromInstanceId(schemaContext, path);
+        if (children.isEmpty()) {
+            rWTransaction.merge(datastore, YangInstanceIdentifier.create(emptySubtree.getIdentifier()), emptySubtree);
+            ensureParentsByMerge(datastore, path, rWTransaction, schemaContext);
+            return;
         }
 
-        final NormalizedNode<?, ?> emptySubtree = ImmutableNodes.fromInstanceId(schemaContext, path);
+        // Kick off batch existence check first...
+        final BatchedExistenceCheck check = BatchedExistenceCheck.start(rWTransaction, datastore, path, children);
+
+        // ... now enqueue modifications. This relies on proper ordering of requests, i.e. these will not affect the
+        // result of the existence checks...
         rWTransaction.merge(datastore, YangInstanceIdentifier.create(emptySubtree.getIdentifier()), emptySubtree);
         ensureParentsByMerge(datastore, path, rWTransaction, schemaContext);
         for (final NormalizedNode<?, ?> child : children) {
+            // FIXME: we really want a create(YangInstanceIdentifier, NormalizedNode) method in the transaction,
+            //        as that would allow us to skip the existence checks
             rWTransaction.put(datastore, path.node(child.getIdentifier()), child);
         }
+
+        // ... finally collect existence checks and abort the transaction if any of them failed.
+        final Entry<YangInstanceIdentifier, ReadFailedException> failure;
+        try {
+            failure = check.getFailure();
+        } catch (InterruptedException e) {
+            rWTransaction.cancel();
+            throw new RestconfDocumentedException("Could not determine the existence of path " + path, e);
+        }
+
+        if (failure != null) {
+            rWTransaction.cancel();
+            final ReadFailedException e = failure.getValue();
+            if (e == null) {
+                throw new RestconfDocumentedException("Data already exists for path: " + failure.getKey(),
+                    ErrorType.PROTOCOL, ErrorTag.DATA_EXISTS);
+            }
+
+            throw new RestconfDocumentedException("Could not determine the existence of path " + failure.getKey(), e,
+                e.getErrorList());
+        }
     }
 
     private static void simplePostPut(final DOMDataReadWriteTransaction rWTransaction,
@@ -1196,18 +1225,6 @@ public class BrokerFacade {
         listener.setRegistration(registration);
     }
 
-    private final class PATCHStatusContextHelper {
-        PATCHStatusContext status;
-
-        public PATCHStatusContext getStatus() {
-            return this.status;
-        }
-
-        public void setStatus(final PATCHStatusContext status) {
-            this.status = status;
-        }
-    }
-
     private static void ensureParentsByMerge(final LogicalDatastoreType store,
             final YangInstanceIdentifier normalizedPath, final DOMDataReadWriteTransaction rwTx,
             final SchemaContext schemaContext) {
@@ -1237,4 +1254,16 @@ public class BrokerFacade {
                 YangInstanceIdentifier.create(normalizedPathWithoutChildArgs));
         rwTx.merge(store, rootNormalizedPath, parentStructure);
     }
+
+    private static final class PATCHStatusContextHelper {
+        PATCHStatusContext status;
+
+        public PATCHStatusContext getStatus() {
+            return this.status;
+        }
+
+        public void setStatus(final PATCHStatusContext status) {
+            this.status = status;
+        }
+    }
 }