BUG-8704: rework seal mechanics to not wait during replay 54/59654/7
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 28 Jun 2017 14:47:13 +0000 (16:47 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 29 Jun 2017 15:40:44 +0000 (17:40 +0200)
AbstractProxyTransaction.seal() and most notably internalSeal()
can end up pushing down messages down the connection hence they
can end up slowing down the replay process.

The replay paths end up enqueing subsequent requests anyway, so
rework the structure to split the 'seal only' and 'seal and flush'
codepaths.

Change-Id: Ie75c1ef8aa0d3d5d7ca482d383fd516077ca50b4
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java

index 84546632f099bb3302938f6d0e8990c3f93e5e3b..07b89e09230949da6c4849b3fb5dc03d4c3c36d8 100644 (file)
@@ -327,36 +327,67 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     }
 
     /**
-     * Seal this transaction before it is either committed or aborted.
+     * Seal this transaction before it is either committed or aborted. This method should only be invoked from
+     * application thread.
      */
     final void seal() {
         // Transition user-visible state first
-        final boolean success = SEALED_UPDATER.compareAndSet(this, 0, 1);
+        final boolean success = markSealed();
         Preconditions.checkState(success, "Proxy %s was already sealed", getIdentifier());
-        internalSeal();
+
+        if (!sealAndSend(Optional.absent())) {
+            sealSuccessor();
+        }
     }
 
-    final void ensureSealed() {
-        if (SEALED_UPDATER.compareAndSet(this, 0, 1)) {
-            internalSeal();
+    /**
+     * Internal seal propagation method, invoked when we have raced with reconnection thread. Note that there may have
+     * been multiple reconnects, so we have to make sure the action is propagate through all intermediate instances.
+     */
+    private void sealSuccessor() {
+        // Slow path: wait for the successor to complete
+        final AbstractProxyTransaction successor = awaitSuccessor();
+
+        // At this point the successor has completed transition and is possibly visible by the user thread, which is
+        // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
+        // Propagate state and seal the successor.
+        flushState(successor);
+        successor.predecessorSealed();
+    }
+
+    private void predecessorSealed() {
+        if (markSealed() && !sealAndSend(Optional.absent())) {
+            sealSuccessor();
         }
     }
 
-    private void internalSeal() {
-        doSeal();
+    void sealOnly() {
         parent.onTransactionSealed(this);
+        final boolean success = STATE_UPDATER.compareAndSet(this, OPEN, SEALED);
+        Verify.verify(success, "Attempted to replay seal on {}", this);
+    }
 
-        // Now deal with state transfer, which can occur via successor or a follow-up canCommit() or directCommit().
-        if (!STATE_UPDATER.compareAndSet(this, OPEN, SEALED)) {
-            // Slow path: wait for the successor to complete
-            final AbstractProxyTransaction successor = awaitSuccessor();
+    /**
+     * Seal this transaction and potentially send it out towards the backend. If this method reports false, the caller
+     * needs to deal with propagating the seal operation towards the successor.
+     *
+     * @param enqueuedTicks Enqueue ticks when this is invoked from replay path.
+     * @return True if seal operation was successful, false if this proxy has a successor.
+     */
+    boolean sealAndSend(final Optional<Long> enqueuedTicks) {
+        parent.onTransactionSealed(this);
 
-            // At this point the successor has completed transition and is possibly visible by the user thread, which is
-            // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
-            // Propagate state and seal the successor.
-            flushState(successor);
-            successor.ensureSealed();
-        }
+        // Transition internal state to sealed and detect presence of a successor
+        return STATE_UPDATER.compareAndSet(this, OPEN, SEALED);
+    }
+
+    /**
+     * Mark this proxy as having been sealed.
+     *
+     * @return True if this call has transitioned to sealed state.
+     */
+    final boolean markSealed() {
+        return SEALED_UPDATER.compareAndSet(this, 0, 1);
     }
 
     private void checkNotSealed() {
@@ -689,7 +720,9 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         if (SEALED.equals(prevState)) {
             LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
             flushState(successor);
-            successor.ensureSealed();
+            if (successor.markSealed()) {
+                successor.sealAndSend(Optional.of(parent.currentTime()));
+            }
         }
     }
 
@@ -761,8 +794,6 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
     abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(YangInstanceIdentifier path);
 
-    abstract void doSeal();
-
     @GuardedBy("this")
     abstract void flushState(AbstractProxyTransaction successor);
 
index feba5fde5bb688c6b7654b2062a61da0560fc75a..99806fd81f8bbaa27b97c9fc37563abe66bd0ee2 100644 (file)
@@ -194,8 +194,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
                 }
             });
 
-            successor.ensureSealed();
-
+            successor.sealOnly();
             final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
             successor.sendRequest(successorReq, callback);
         } else if (request instanceof AbortLocalTransactionRequest) {
index 7cc245a42f302871726cb2a4a297a2b82fb09c61..e85c86f5b53e985771d6f470f8d75d363eca0ab0 100644 (file)
@@ -72,11 +72,6 @@ final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction {
         throw new UnsupportedOperationException("Read-only snapshot");
     }
 
-    @Override
-    void doSeal() {
-        // No-op
-    }
-
     @Override
     void flushState(final AbstractProxyTransaction successor) {
         // No-op
index eee4fd0e137f714b86d24c3279d717c9c323a051..c44e70eb304f4458aeb2d9782b9f9581abcbfa81 100644 (file)
@@ -176,14 +176,25 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
         return ret;
     }
 
-    @Override
-    void doSeal() {
-        Preconditions.checkState(sealedModification == null, "Transaction %s is already sealed", getIdentifier());
+    private void sealModification() {
+        Preconditions.checkState(sealedModification == null, "Transaction %s is already sealed", this);
         final CursorAwareDataTreeModification mod = getModification();
         mod.ready();
         sealedModification = mod;
     }
 
+    @Override
+    void sealOnly() {
+        sealModification();
+        super.sealOnly();
+    }
+
+    @Override
+    boolean sealAndSend(final com.google.common.base.Optional<Long> enqueuedTicks) {
+        sealModification();
+        return super.sealAndSend(enqueuedTicks);
+    }
+
     @Override
     void flushState(final AbstractProxyTransaction successor) {
         sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() {
@@ -239,14 +250,16 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
         final Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
         if (maybeProtocol.isPresent()) {
             Verify.verify(callback != null, "Request {} has null callback", request);
-            ensureSealed();
+            if (markSealed()) {
+                sealOnly();
+            }
 
             switch (maybeProtocol.get()) {
                 case ABORT:
                     sendMethod.accept(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback);
                     break;
                 case READY:
-                    // No-op, as we have already issued a seal()
+                    // No-op, as we have already issued a sealOnly() and we are not transmitting anything
                     break;
                 case SIMPLE:
                     sendMethod.accept(commitRequest(false), callback);
@@ -264,7 +277,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
     void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
             final Consumer<Response<?, ?>> callback, final long now) {
         if (request instanceof CommitLocalTransactionRequest) {
-            sendCommit((CommitLocalTransactionRequest) request, callback);
+            enqueueRequest(rebaseCommit((CommitLocalTransactionRequest)request), callback, now);
         } else {
             super.handleReplayedLocalRequest(request, callback, now);
         }
@@ -308,7 +321,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             final Consumer<Response<?, ?>> callback) {
         if (request instanceof CommitLocalTransactionRequest) {
             Verify.verify(successor instanceof LocalReadWriteProxyTransaction);
-            ((LocalReadWriteProxyTransaction) successor).sendCommit((CommitLocalTransactionRequest)request, callback);
+            ((LocalReadWriteProxyTransaction) successor).sendRebased((CommitLocalTransactionRequest)request, callback);
             LOG.debug("Forwarded request {} to successor {}", request, successor);
         } else {
             super.forwardToLocal(successor, request, callback);
@@ -336,7 +349,11 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
         return Preconditions.checkNotNull(modification, "Transaction %s is DONE", getIdentifier());
     }
 
-    private void sendCommit(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
+    private void sendRebased(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
+        sendRequest(rebaseCommit(request), callback);
+    }
+
+    private CommitLocalTransactionRequest rebaseCommit(final CommitLocalTransactionRequest request) {
         // Rebase old modification on new data tree.
         final CursorAwareDataTreeModification mod = getModification();
 
@@ -344,7 +361,10 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             request.getModification().applyToCursor(cursor);
         }
 
-        ensureSealed();
-        sendRequest(commitRequest(request.isCoordinated()), callback);
+        if (markSealed()) {
+            sealOnly();
+        }
+
+        return commitRequest(request.isCoordinated());
     }
 }
index 3b5f80ffe8bc02ee267620f7bfafdbcd1cf01a2a..1ba96426df75feb7e38e4db0f217c8c4f98708b8 100644 (file)
@@ -146,9 +146,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     }
 
     private void ensureFlushedBuider() {
-        if (builderBusy) {
-            flushBuilder();
-        }
+        ensureFlushedBuider(Optional.absent());
     }
 
     private void ensureFlushedBuider(final Optional<Long> enqueuedTicks) {
@@ -157,10 +155,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         }
     }
 
-    private void flushBuilder() {
-        flushBuilder(Optional.absent());
-    }
-
     private void flushBuilder(final Optional<Long> enqueuedTicks) {
         final ModifyTransactionRequest request = builder.build();
         builderBusy = false;
@@ -253,28 +247,33 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     ModifyTransactionRequest abortRequest() {
         ensureInitializedBuilder();
         builder.setAbort();
-        final ModifyTransactionRequest ret = builder.build();
         builderBusy = false;
-        return ret;
+        return builder.build();
     }
 
     @Override
     ModifyTransactionRequest commitRequest(final boolean coordinated) {
         ensureInitializedBuilder();
         builder.setCommit(coordinated);
+        builderBusy = false;
+        return builder.build();
+    }
 
-        final ModifyTransactionRequest ret = builder.build();
+    private ModifyTransactionRequest readyRequest() {
+        ensureInitializedBuilder();
+        builder.setReady();
         builderBusy = false;
-        return ret;
+        return builder.build();
     }
 
     @Override
-    void doSeal() {
+    boolean sealAndSend(final Optional<Long> enqueuedTicks) {
         if (sendReadyOnSeal) {
             ensureInitializedBuilder();
             builder.setReady();
-            flushBuilder();
+            flushBuilder(enqueuedTicks);
         }
+        return super.sealAndSend(enqueuedTicks);
     }
 
     @Override
@@ -300,7 +299,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
             final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
             if (maybeProto.isPresent()) {
-                ensureSealed();
+                // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+                // until we know what we are going to do.
+                if (markSealed()) {
+                    sealOnly();
+                }
 
                 final TransactionRequest<?> tmp;
                 switch (maybeProto.get()) {
@@ -326,7 +329,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
                         });
                         break;
                     case READY:
-                        //no op
+                        tmp = readyRequest();
+                        sendRequest(tmp, resp -> {
+                            recordSuccessfulRequest(tmp);
+                            callback.accept(resp);
+                        });
                         break;
                     default:
                         throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
@@ -424,7 +431,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
             final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
             if (maybeProto.isPresent()) {
-                ensureSealed();
+                // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+                // until we know what we are going to do.
+                if (markSealed()) {
+                    sealOnly();
+                }
 
                 final TransactionRequest<?> tmp;
                 switch (maybeProto.get()) {
@@ -450,7 +461,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
                         }, enqueuedTicks);
                         break;
                     case READY:
-                        //no op
+                        tmp = readyRequest();
+                        enqueueRequest(tmp, resp -> {
+                            recordSuccessfulRequest(tmp);
+                            cb.accept(resp);
+                        }, enqueuedTicks);
                         break;
                     default:
                         throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
index 7eebfe02cad234c588aa2b3e000cc8f10bf3de34..e0999211ee749016779da62f66325eb3d7b47dd7 100644 (file)
@@ -137,9 +137,9 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes
     }
 
     @Test
-    public void testDoSeal() throws Exception {
+    public void testSealOnly() throws Exception {
         assertOperationThrowsException(() -> transaction.getSnapshot(), IllegalStateException.class);
-        transaction.doSeal();
+        transaction.sealOnly();
         Assert.assertEquals(modification, transaction.getSnapshot());
     }
 
@@ -148,7 +148,7 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes
         final TransactionTester<RemoteProxyTransaction> transactionTester = createRemoteProxyTransactionTester();
         final RemoteProxyTransaction successor = transactionTester.getTransaction();
         doAnswer(this::applyToCursorAnswer).when(modification).applyToCursor(any());
-        transaction.doSeal();
+        transaction.sealOnly();
         transaction.flushState(successor);
         verify(modification).applyToCursor(any());
         transactionTester.getTransaction().seal();