From 18ddbfdc55a1faddf7aeb2df6b25481d34c820ab Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 28 Jun 2017 16:47:13 +0200 Subject: [PATCH] BUG-8704: rework seal mechanics to not wait during replay AbstractProxyTransaction.seal() and most notably internalSeal() can end up pushing down messages down the connection hence they can end up slowing down the replay process. The replay paths end up enqueing subsequent requests anyway, so rework the structure to split the 'seal only' and 'seal and flush' codepaths. Change-Id: Ie75c1ef8aa0d3d5d7ca482d383fd516077ca50b4 Signed-off-by: Robert Varga (cherry picked from commit 1e07329c0d800b8fea43ae0c4060aded5fd18739) --- .../actors/dds/AbstractProxyTransaction.java | 73 +++++++++++++------ .../actors/dds/LocalProxyTransaction.java | 3 +- .../dds/LocalReadOnlyProxyTransaction.java | 5 -- .../dds/LocalReadWriteProxyTransaction.java | 40 +++++++--- .../actors/dds/RemoteProxyTransaction.java | 49 ++++++++----- .../LocalReadWriteProxyTransactionTest.java | 8 +- 6 files changed, 119 insertions(+), 59 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index 84546632f0..07b89e0923 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -327,36 +327,67 @@ abstract class AbstractProxyTransaction implements Identifiable enqueuedTicks) { + parent.onTransactionSealed(this); - // At this point the successor has completed transition and is possibly visible by the user thread, which is - // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed. - // Propagate state and seal the successor. - flushState(successor); - successor.ensureSealed(); - } + // Transition internal state to sealed and detect presence of a successor + return STATE_UPDATER.compareAndSet(this, OPEN, SEALED); + } + + /** + * Mark this proxy as having been sealed. + * + * @return True if this call has transitioned to sealed state. + */ + final boolean markSealed() { + return SEALED_UPDATER.compareAndSet(this, 0, 1); } private void checkNotSealed() { @@ -689,7 +720,9 @@ abstract class AbstractProxyTransaction implements Identifiable>, ReadFailedException> doRead(YangInstanceIdentifier path); - abstract void doSeal(); - @GuardedBy("this") abstract void flushState(AbstractProxyTransaction successor); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index feba5fde5b..99806fd81f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -194,8 +194,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } }); - successor.ensureSealed(); - + successor.sealOnly(); final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated()); successor.sendRequest(successorReq, callback); } else if (request instanceof AbortLocalTransactionRequest) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java index 7cc245a42f..e85c86f5b5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java @@ -72,11 +72,6 @@ final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction { throw new UnsupportedOperationException("Read-only snapshot"); } - @Override - void doSeal() { - // No-op - } - @Override void flushState(final AbstractProxyTransaction successor) { // No-op diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java index eee4fd0e13..c44e70eb30 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java @@ -176,14 +176,25 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { return ret; } - @Override - void doSeal() { - Preconditions.checkState(sealedModification == null, "Transaction %s is already sealed", getIdentifier()); + private void sealModification() { + Preconditions.checkState(sealedModification == null, "Transaction %s is already sealed", this); final CursorAwareDataTreeModification mod = getModification(); mod.ready(); sealedModification = mod; } + @Override + void sealOnly() { + sealModification(); + super.sealOnly(); + } + + @Override + boolean sealAndSend(final com.google.common.base.Optional enqueuedTicks) { + sealModification(); + return super.sealAndSend(enqueuedTicks); + } + @Override void flushState(final AbstractProxyTransaction successor) { sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() { @@ -239,14 +250,16 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { final Optional maybeProtocol = request.getPersistenceProtocol(); if (maybeProtocol.isPresent()) { Verify.verify(callback != null, "Request {} has null callback", request); - ensureSealed(); + if (markSealed()) { + sealOnly(); + } switch (maybeProtocol.get()) { case ABORT: sendMethod.accept(new AbortLocalTransactionRequest(getIdentifier(), localActor()), callback); break; case READY: - // No-op, as we have already issued a seal() + // No-op, as we have already issued a sealOnly() and we are not transmitting anything break; case SIMPLE: sendMethod.accept(commitRequest(false), callback); @@ -264,7 +277,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { void handleReplayedLocalRequest(final AbstractLocalTransactionRequest request, final Consumer> callback, final long now) { if (request instanceof CommitLocalTransactionRequest) { - sendCommit((CommitLocalTransactionRequest) request, callback); + enqueueRequest(rebaseCommit((CommitLocalTransactionRequest)request), callback, now); } else { super.handleReplayedLocalRequest(request, callback, now); } @@ -308,7 +321,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { final Consumer> callback) { if (request instanceof CommitLocalTransactionRequest) { Verify.verify(successor instanceof LocalReadWriteProxyTransaction); - ((LocalReadWriteProxyTransaction) successor).sendCommit((CommitLocalTransactionRequest)request, callback); + ((LocalReadWriteProxyTransaction) successor).sendRebased((CommitLocalTransactionRequest)request, callback); LOG.debug("Forwarded request {} to successor {}", request, successor); } else { super.forwardToLocal(successor, request, callback); @@ -336,7 +349,11 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { return Preconditions.checkNotNull(modification, "Transaction %s is DONE", getIdentifier()); } - private void sendCommit(final CommitLocalTransactionRequest request, final Consumer> callback) { + private void sendRebased(final CommitLocalTransactionRequest request, final Consumer> callback) { + sendRequest(rebaseCommit(request), callback); + } + + private CommitLocalTransactionRequest rebaseCommit(final CommitLocalTransactionRequest request) { // Rebase old modification on new data tree. final CursorAwareDataTreeModification mod = getModification(); @@ -344,7 +361,10 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { request.getModification().applyToCursor(cursor); } - ensureSealed(); - sendRequest(commitRequest(request.isCoordinated()), callback); + if (markSealed()) { + sealOnly(); + } + + return commitRequest(request.isCoordinated()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 3b5f80ffe8..1ba96426df 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -146,9 +146,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } private void ensureFlushedBuider() { - if (builderBusy) { - flushBuilder(); - } + ensureFlushedBuider(Optional.absent()); } private void ensureFlushedBuider(final Optional enqueuedTicks) { @@ -157,10 +155,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } } - private void flushBuilder() { - flushBuilder(Optional.absent()); - } - private void flushBuilder(final Optional enqueuedTicks) { final ModifyTransactionRequest request = builder.build(); builderBusy = false; @@ -253,28 +247,33 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { ModifyTransactionRequest abortRequest() { ensureInitializedBuilder(); builder.setAbort(); - final ModifyTransactionRequest ret = builder.build(); builderBusy = false; - return ret; + return builder.build(); } @Override ModifyTransactionRequest commitRequest(final boolean coordinated) { ensureInitializedBuilder(); builder.setCommit(coordinated); + builderBusy = false; + return builder.build(); + } - final ModifyTransactionRequest ret = builder.build(); + private ModifyTransactionRequest readyRequest() { + ensureInitializedBuilder(); + builder.setReady(); builderBusy = false; - return ret; + return builder.build(); } @Override - void doSeal() { + boolean sealAndSend(final Optional enqueuedTicks) { if (sendReadyOnSeal) { ensureInitializedBuilder(); builder.setReady(); - flushBuilder(); + flushBuilder(enqueuedTicks); } + return super.sealAndSend(enqueuedTicks); } @Override @@ -300,7 +299,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { final java.util.Optional maybeProto = req.getPersistenceProtocol(); if (maybeProto.isPresent()) { - ensureSealed(); + // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions + // until we know what we are going to do. + if (markSealed()) { + sealOnly(); + } final TransactionRequest tmp; switch (maybeProto.get()) { @@ -326,7 +329,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { }); break; case READY: - //no op + tmp = readyRequest(); + sendRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + callback.accept(resp); + }); break; default: throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); @@ -424,7 +431,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { final java.util.Optional maybeProto = req.getPersistenceProtocol(); if (maybeProto.isPresent()) { - ensureSealed(); + // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions + // until we know what we are going to do. + if (markSealed()) { + sealOnly(); + } final TransactionRequest tmp; switch (maybeProto.get()) { @@ -450,7 +461,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { }, enqueuedTicks); break; case READY: - //no op + tmp = readyRequest(); + enqueueRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + cb.accept(resp); + }, enqueuedTicks); break; default: throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java index 7f1508d4d4..cbd9f3da3e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java @@ -137,9 +137,9 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes } @Test - public void testDoSeal() throws Exception { + public void testSealOnly() throws Exception { assertOperationThrowsException(() -> transaction.getSnapshot(), IllegalStateException.class); - transaction.doSeal(); + transaction.sealOnly(); Assert.assertEquals(modification, transaction.getSnapshot()); } @@ -148,7 +148,7 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes final TransactionTester transactionTester = createRemoteProxyTransactionTester(); final RemoteProxyTransaction successor = transactionTester.getTransaction(); doAnswer(LocalProxyTransactionTest::applyToCursorAnswer).when(modification).applyToCursor(any()); - transaction.doSeal(); + transaction.sealOnly(); transaction.flushState(successor); verify(modification).applyToCursor(any()); transactionTester.getTransaction().seal(); @@ -246,4 +246,4 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes Assert.assertEquals(coordinated, commitRequest.isCoordinated()); } -} \ No newline at end of file +} -- 2.36.6