BUG-8704: rework seal mechanics to not wait during replay 51/59851/2
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 28 Jun 2017 14:47:13 +0000 (16:47 +0200)
committerRobert Varga <nite@hq.sk>
Mon, 3 Jul 2017 11:08:19 +0000 (11:08 +0000)
AbstractProxyTransaction.seal() and most notably internalSeal()
can end up pushing down messages down the connection hence they
can end up slowing down the replay process.

The replay paths end up enqueing subsequent requests anyway, so
rework the structure to split the 'seal only' and 'seal and flush'
codepaths.

Change-Id: Ie75c1ef8aa0d3d5d7ca482d383fd516077ca50b4
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 1e07329c0d800b8fea43ae0c4060aded5fd18739)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java

index 8454663..07b89e0 100644 (file)
@@ -327,36 +327,67 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     }
 
     /**
-     * Seal this transaction before it is either committed or aborted.
+     * Seal this transaction before it is either committed or aborted. This method should only be invoked from
+     * application thread.
      */
     final void seal() {
         // Transition user-visible state first
-        final boolean success = SEALED_UPDATER.compareAndSet(this, 0, 1);
+        final boolean success = markSealed();
         Preconditions.checkState(success, "Proxy %s was already sealed", getIdentifier());
-        internalSeal();
+
+        if (!sealAndSend(Optional.absent())) {
+            sealSuccessor();
+        }
     }
 
-    final void ensureSealed() {
-        if (SEALED_UPDATER.compareAndSet(this, 0, 1)) {
-            internalSeal();
+    /**
+     * Internal seal propagation method, invoked when we have raced with reconnection thread. Note that there may have
+     * been multiple reconnects, so we have to make sure the action is propagate through all intermediate instances.
+     */
+    private void sealSuccessor() {
+        // Slow path: wait for the successor to complete
+        final AbstractProxyTransaction successor = awaitSuccessor();
+
+        // At this point the successor has completed transition and is possibly visible by the user thread, which is
+        // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
+        // Propagate state and seal the successor.
+        flushState(successor);
+        successor.predecessorSealed();
+    }
+
+    private void predecessorSealed() {
+        if (markSealed() && !sealAndSend(Optional.absent())) {
+            sealSuccessor();
         }
     }
 
-    private void internalSeal() {
-        doSeal();
+    void sealOnly() {
         parent.onTransactionSealed(this);
+        final boolean success = STATE_UPDATER.compareAndSet(this, OPEN, SEALED);
+        Verify.verify(success, "Attempted to replay seal on {}", this);
+    }
 
-        // Now deal with state transfer, which can occur via successor or a follow-up canCommit() or directCommit().
-        if (!STATE_UPDATER.compareAndSet(this, OPEN, SEALED)) {
-            // Slow path: wait for the successor to complete
-            final AbstractProxyTransaction successor = awaitSuccessor();
+    /**
+     * Seal this transaction and potentially send it out towards the backend. If this method reports false, the caller
+     * needs to deal with propagating the seal operation towards the successor.
+     *
+     * @param enqueuedTicks Enqueue ticks when this is invoked from replay path.
+     * @return True if seal operation was successful, false if this proxy has a successor.
+     */
+    boolean sealAndSend(final Optional<Long> enqueuedTicks) {
+        parent.onTransactionSealed(this);
 
-            // At this point the successor has completed transition and is possibly visible by the user thread, which is
-            // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
-            // Propagate state and seal the successor.
-            flushState(successor);
-            successor.ensureSealed();
-        }
+        // Transition internal state to sealed and detect presence of a successor
+        return STATE_UPDATER.compareAndSet(this, OPEN, SEALED);
+    }
+
+    /**
+     * Mark this proxy as having been sealed.
+     *
+     * @return True if this call has transitioned to sealed state.
+     */
+    final boolean markSealed() {
+        return SEALED_UPDATER.compareAndSet(this, 0, 1);
     }
 
     private void checkNotSealed() {
@@ -689,7 +720,9 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         if (SEALED.equals(prevState)) {
             LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
             flushState(successor);
-            successor.ensureSealed();
+            if (successor.markSealed()) {
+                successor.sealAndSend(Optional.of(parent.currentTime()));
+            }
         }
     }
 
@@ -761,8 +794,6 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
     abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(YangInstanceIdentifier path);
 
-    abstract void doSeal();
-
     @GuardedBy("this")
     abstract void flushState(AbstractProxyTransaction successor);
 
index feba5fd..99806fd 100644 (file)
@@ -194,8 +194,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
                 }
             });
 
-            successor.ensureSealed();
-
+            successor.sealOnly();
             final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
             successor.sendRequest(successorReq, callback);
         } else if (request instanceof AbortLocalTransactionRequest) {
index 7cc245a..e85c86f 100644 (file)
@@ -72,11 +72,6 @@ final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction {
         throw new UnsupportedOperationException("Read-only snapshot");
     }
 
-    @Override
-    void doSeal() {
-        // No-op
-    }
-
     @Override
     void flushState(final AbstractProxyTransaction successor) {
         // No-op
index eee4fd0..c44e70e 100644 (file)
@@ -176,14 +176,25 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
         return ret;
     }
 
-    @Override
-    void doSeal() {
-        Preconditions.checkState(sealedModification == null, "Transaction %s is already sealed", getIdentifier());
+    private void sealModification() {
+        Preconditions.checkState(sealedModification == null, "Transaction %s is already sealed", this);
         final CursorAwareDataTreeModification mod = getModification();
         mod.ready();
         sealedModification = mod;
     }
 
+    @Override
+    void sealOnly() {
+        sealModification();
+        super.sealOnly();
+    }
+
+    @Override
+    boolean sealAndSend(final com.google.common.base.Optional<Long> enqueuedTicks) {
+        sealModification();
+        return super.sealAndSend(enqueuedTicks);
+    }
+
     @Override
     void flushState(final AbstractProxyTransaction successor) {
         sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() {
@@ -239,14 +250,16 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
         final Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
         if (maybeProtocol.isPresent()) {
             Verify.verify(callback != null, "Request {} has null callback", request);
-            ensureSealed();
+            if (markSealed()) {
+                sealOnly();
+            }
 
             switch (maybeProtocol.get()) {
                 case ABORT:
                     sendMethod.accept(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback);
                     break;
                 case READY:
-                    // No-op, as we have already issued a seal()
+                    // No-op, as we have already issued a sealOnly() and we are not transmitting anything
                     break;
                 case SIMPLE:
                     sendMethod.accept(commitRequest(false), callback);
@@ -264,7 +277,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
     void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
             final Consumer<Response<?, ?>> callback, final long now) {
         if (request instanceof CommitLocalTransactionRequest) {
-            sendCommit((CommitLocalTransactionRequest) request, callback);
+            enqueueRequest(rebaseCommit((CommitLocalTransactionRequest)request), callback, now);
         } else {
             super.handleReplayedLocalRequest(request, callback, now);
         }
@@ -308,7 +321,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             final Consumer<Response<?, ?>> callback) {
         if (request instanceof CommitLocalTransactionRequest) {
             Verify.verify(successor instanceof LocalReadWriteProxyTransaction);
-            ((LocalReadWriteProxyTransaction) successor).sendCommit((CommitLocalTransactionRequest)request, callback);
+            ((LocalReadWriteProxyTransaction) successor).sendRebased((CommitLocalTransactionRequest)request, callback);
             LOG.debug("Forwarded request {} to successor {}", request, successor);
         } else {
             super.forwardToLocal(successor, request, callback);
@@ -336,7 +349,11 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
         return Preconditions.checkNotNull(modification, "Transaction %s is DONE", getIdentifier());
     }
 
-    private void sendCommit(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
+    private void sendRebased(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
+        sendRequest(rebaseCommit(request), callback);
+    }
+
+    private CommitLocalTransactionRequest rebaseCommit(final CommitLocalTransactionRequest request) {
         // Rebase old modification on new data tree.
         final CursorAwareDataTreeModification mod = getModification();
 
@@ -344,7 +361,10 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             request.getModification().applyToCursor(cursor);
         }
 
-        ensureSealed();
-        sendRequest(commitRequest(request.isCoordinated()), callback);
+        if (markSealed()) {
+            sealOnly();
+        }
+
+        return commitRequest(request.isCoordinated());
     }
 }
index 3b5f80f..1ba9642 100644 (file)
@@ -146,9 +146,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     }
 
     private void ensureFlushedBuider() {
-        if (builderBusy) {
-            flushBuilder();
-        }
+        ensureFlushedBuider(Optional.absent());
     }
 
     private void ensureFlushedBuider(final Optional<Long> enqueuedTicks) {
@@ -157,10 +155,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         }
     }
 
-    private void flushBuilder() {
-        flushBuilder(Optional.absent());
-    }
-
     private void flushBuilder(final Optional<Long> enqueuedTicks) {
         final ModifyTransactionRequest request = builder.build();
         builderBusy = false;
@@ -253,28 +247,33 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     ModifyTransactionRequest abortRequest() {
         ensureInitializedBuilder();
         builder.setAbort();
-        final ModifyTransactionRequest ret = builder.build();
         builderBusy = false;
-        return ret;
+        return builder.build();
     }
 
     @Override
     ModifyTransactionRequest commitRequest(final boolean coordinated) {
         ensureInitializedBuilder();
         builder.setCommit(coordinated);
+        builderBusy = false;
+        return builder.build();
+    }
 
-        final ModifyTransactionRequest ret = builder.build();
+    private ModifyTransactionRequest readyRequest() {
+        ensureInitializedBuilder();
+        builder.setReady();
         builderBusy = false;
-        return ret;
+        return builder.build();
     }
 
     @Override
-    void doSeal() {
+    boolean sealAndSend(final Optional<Long> enqueuedTicks) {
         if (sendReadyOnSeal) {
             ensureInitializedBuilder();
             builder.setReady();
-            flushBuilder();
+            flushBuilder(enqueuedTicks);
         }
+        return super.sealAndSend(enqueuedTicks);
     }
 
     @Override
@@ -300,7 +299,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
             final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
             if (maybeProto.isPresent()) {
-                ensureSealed();
+                // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+                // until we know what we are going to do.
+                if (markSealed()) {
+                    sealOnly();
+                }
 
                 final TransactionRequest<?> tmp;
                 switch (maybeProto.get()) {
@@ -326,7 +329,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
                         });
                         break;
                     case READY:
-                        //no op
+                        tmp = readyRequest();
+                        sendRequest(tmp, resp -> {
+                            recordSuccessfulRequest(tmp);
+                            callback.accept(resp);
+                        });
                         break;
                     default:
                         throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
@@ -424,7 +431,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
             final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
             if (maybeProto.isPresent()) {
-                ensureSealed();
+                // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+                // until we know what we are going to do.
+                if (markSealed()) {
+                    sealOnly();
+                }
 
                 final TransactionRequest<?> tmp;
                 switch (maybeProto.get()) {
@@ -450,7 +461,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
                         }, enqueuedTicks);
                         break;
                     case READY:
-                        //no op
+                        tmp = readyRequest();
+                        enqueueRequest(tmp, resp -> {
+                            recordSuccessfulRequest(tmp);
+                            cb.accept(resp);
+                        }, enqueuedTicks);
                         break;
                     default:
                         throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
index 7f1508d..cbd9f3d 100644 (file)
@@ -137,9 +137,9 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes
     }
 
     @Test
-    public void testDoSeal() throws Exception {
+    public void testSealOnly() throws Exception {
         assertOperationThrowsException(() -> transaction.getSnapshot(), IllegalStateException.class);
-        transaction.doSeal();
+        transaction.sealOnly();
         Assert.assertEquals(modification, transaction.getSnapshot());
     }
 
@@ -148,7 +148,7 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes
         final TransactionTester<RemoteProxyTransaction> transactionTester = createRemoteProxyTransactionTester();
         final RemoteProxyTransaction successor = transactionTester.getTransaction();
         doAnswer(LocalProxyTransactionTest::applyToCursorAnswer).when(modification).applyToCursor(any());
-        transaction.doSeal();
+        transaction.sealOnly();
         transaction.flushState(successor);
         verify(modification).applyToCursor(any());
         transactionTester.getTransaction().seal();
@@ -246,4 +246,4 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes
         Assert.assertEquals(coordinated, commitRequest.isCoordinated());
     }
 
-}
\ No newline at end of file
+}

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.