From: Robert Varga Date: Wed, 24 May 2017 12:42:04 +0000 (+0200) Subject: BUG-8538: rework transaction abort paths X-Git-Tag: release/nitrogen~157 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=b4e199c509cab1c9b838c6471a96ce3cb21fa913 BUG-8538: rework transaction abort paths Direct transaction abort path can end up touching proxy history's maps, which it should not, as that happens only after purge. This inconsistency has cropped up when purge was introduced. Refactor the methods so that cohorts are removed only after purge, and fix abort request routing such that it always enqueues a purge request (possibly via successor). This also addresses a FIXME, as we now have an enqueueAbort() request, which is not waiting on the queue. Change-Id: Ie291da70ace772274f33505db376a915b38e37c0 Signed-off-by: Robert Varga (cherry picked from commit 8fca604f2312ef365ce05343c2378cf36f2e31af) --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index 8dfdf34adf..b8057c5afe 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -330,14 +330,18 @@ abstract class AbstractProxyTransaction implements Identifiable { + LOG.debug("Transaction {} abort completed with {}", getIdentifier(), resp); + sendPurge(); + }); } final void abort(final VotingFuture ret) { checkSealed(); - sendAbort(t -> { + sendDoAbort(t -> { if (t instanceof TransactionAbortSuccess) { ret.voteYes(); } else if (t instanceof RequestFailure) { @@ -353,11 +357,24 @@ abstract class AbstractProxyTransaction implements Identifiable> callback, final long enqueuedTicks) { + checkNotSealed(); + parent.abortTransaction(this); + + enqueueRequest(abortRequest(), resp -> { + LOG.debug("Transaction {} abort completed with {}", getIdentifier(), resp); + // Purge will be sent by the predecessor's callback + if (callback != null) { + callback.accept(resp); + } + }, enqueuedTicks); + } + + final void enqueueDoAbort(final Consumer> callback, final long enqueuedTicks) { enqueueRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback, enqueuedTicks); } - final void sendAbort(final Consumer> callback) { + final void sendDoAbort(final Consumer> callback) { sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback); } @@ -484,24 +501,29 @@ abstract class AbstractProxyTransaction implements Identifiable req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor()); - sendRequest(req, t -> { - LOG.debug("Transaction {} purge completed", this); - parent.completeTransaction(this); - }); + final void sendPurge(final Consumer> callback) { + sendRequest(purgeRequest(), resp -> completePurge(resp, callback)); + } + + final void enqueuePurge(final Consumer> callback, final long enqueuedTicks) { + enqueueRequest(purgeRequest(), resp -> completePurge(resp, callback), enqueuedTicks); } - final void enqueuePurge(final long enqueuedTicks) { + private TransactionPurgeRequest purgeRequest() { successfulRequests.clear(); + return new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor()); + } - final TransactionRequest req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor()); - enqueueRequest(req, t -> { - LOG.debug("Transaction {} purge completed", this); - parent.completeTransaction(this); - }, enqueuedTicks); + private void completePurge(final Response resp, final Consumer> callback) { + LOG.debug("Transaction {} purge completed", this); + parent.completeTransaction(this); + if (callback != null) { + callback.accept(resp); + } } // Called with the connection unlocked @@ -645,11 +667,11 @@ abstract class AbstractProxyTransaction implements Identifiable abortRequest(); + abstract TransactionRequest commitRequest(boolean coordinated); /** diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 5b47f22971..49aedaf83d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -70,7 +70,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { abstract DataTreeSnapshot readOnlyView(); - abstract void applyModifyTransactionRequest(ModifyTransactionRequest request, + abstract void applyForwardedModifyTransactionRequest(ModifyTransactionRequest request, @Nullable Consumer> callback); abstract void replayModifyTransactionRequest(ModifyTransactionRequest request, @@ -87,9 +87,8 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } @Override - final void doAbort() { - sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), - response -> LOG.debug("Transaction {} abort completed with {}", identifier, response)); + final AbortLocalTransactionRequest abortRequest() { + return new AbortLocalTransactionRequest(identifier, localActor()); } @Override @@ -139,7 +138,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } else if (handleReadRequest(request, callback)) { // No-op } else if (request instanceof TransactionPurgeRequest) { - enqueuePurge(enqueuedTicks); + enqueuePurge(callback, enqueuedTicks); } else if (request instanceof IncrementTransactionSequenceRequest) { // Local transactions do not have non-replayable requests which would be visible to the backend, // hence we can skip sequence increments. @@ -159,11 +158,11 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { */ void handleForwardedRemoteRequest(final TransactionRequest request, final Consumer> callback) { if (request instanceof ModifyTransactionRequest) { - applyModifyTransactionRequest((ModifyTransactionRequest) request, callback); + applyForwardedModifyTransactionRequest((ModifyTransactionRequest) request, callback); } else if (handleReadRequest(request, callback)) { // No-op } else if (request instanceof TransactionPurgeRequest) { - sendPurge(); + sendPurge(callback); } else { throw new IllegalArgumentException("Unhandled request " + request); } @@ -203,7 +202,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { successor.abort(); } else if (request instanceof TransactionPurgeRequest) { LOG.debug("Forwarding purge {} to successor {}", request, successor); - successor.sendPurge(); + successor.sendPurge(callback); } else { throw new IllegalArgumentException("Unhandled request" + request); } @@ -215,7 +214,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { if (request instanceof AbortLocalTransactionRequest) { successor.sendAbort(request, callback); } else if (request instanceof TransactionPurgeRequest) { - successor.sendPurge(); + successor.sendPurge(callback); } else { throw new IllegalArgumentException("Unhandled request" + request); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java index b600a66ea6..4fed10d4bf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java @@ -77,22 +77,20 @@ final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction { } @Override - void applyModifyTransactionRequest(final ModifyTransactionRequest request, + void applyForwardedModifyTransactionRequest(final ModifyTransactionRequest request, final Consumer> callback) { - commonModifyTransactionRequest(request, callback); + commonModifyTransactionRequest(request); abort(); } @Override void replayModifyTransactionRequest(final ModifyTransactionRequest request, final Consumer> callback, final long enqueuedTicks) { - commonModifyTransactionRequest(request, callback); - // FIXME: this should go through the enqueueRequest() path - abort(); + commonModifyTransactionRequest(request); + enqueueAbort(callback, enqueuedTicks); } - private static void commonModifyTransactionRequest(final ModifyTransactionRequest request, - final Consumer> callback) { + private static void commonModifyTransactionRequest(final ModifyTransactionRequest request) { Verify.verify(request.getModifications().isEmpty()); final PersistenceProtocol protocol = request.getPersistenceProtocol().get(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java index 424a9ea0db..cbf316d881 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java @@ -203,7 +203,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { } @Override - void applyModifyTransactionRequest(final ModifyTransactionRequest request, + void applyForwardedModifyTransactionRequest(final ModifyTransactionRequest request, final @Nullable Consumer> callback) { commonModifyTransactionRequest(request, callback, this::sendRequest); } @@ -275,7 +275,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback, enqueuedTicks); } else if (request instanceof TransactionAbortRequest) { - enqueueAbort(callback, enqueuedTicks); + enqueueDoAbort(callback, enqueuedTicks); } else { super.handleReplayedRemoteRequest(request, callback, enqueuedTicks); } @@ -290,7 +290,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { } else if (request instanceof TransactionDoCommitRequest) { sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); } else if (request instanceof TransactionAbortRequest) { - sendAbort(callback); + sendDoAbort(callback); } else { super.handleForwardedRemoteRequest(request, callback); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index b529d94c2b..34e8ba37a3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -376,8 +376,8 @@ abstract class ProxyHistory implements Identifiable { final void abortTransaction(final AbstractProxyTransaction tx) { lock.lock(); try { - proxies.remove(tx.getIdentifier()); - LOG.debug("Proxy {} aborting transaction {}", this, tx); + // Removal will be completed once purge completes + LOG.debug("Proxy {} aborted transaction {}", this, tx); onTransactionAborted(tx); } finally { lock.unlock(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 829be478f0..0095ec58a3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -138,13 +138,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { isSnapshotOnly()), t -> completeRead(future, t), future); } - @Override - void doAbort() { - ensureInitializedBuilder(); - builder.setAbort(); - flushBuilder(); - } - private void ensureInitializedBuilder() { if (!builderBusy) { builder.setSequence(nextSequence()); @@ -256,7 +249,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { recordFinishedRequest(response); } - private ModifyTransactionRequest abortRequest() { + @Override + ModifyTransactionRequest abortRequest() { ensureInitializedBuilder(); builder.setAbort(); final ModifyTransactionRequest ret = builder.build(); @@ -365,9 +359,9 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); } else if (request instanceof TransactionAbortRequest) { ensureFlushedBuider(); - sendAbort(callback); + sendDoAbort(callback); } else if (request instanceof TransactionPurgeRequest) { - sendPurge(); + sendPurge(callback); } else { throw new IllegalArgumentException("Unhandled request {}" + request); } @@ -490,9 +484,9 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { enqueuedTicks); } else if (request instanceof TransactionAbortRequest) { ensureFlushedBuider(optTicks); - enqueueAbort(callback, enqueuedTicks); + enqueueDoAbort(callback, enqueuedTicks); } else if (request instanceof TransactionPurgeRequest) { - enqueuePurge(enqueuedTicks); + enqueuePurge(callback, enqueuedTicks); } else if (request instanceof IncrementTransactionSequenceRequest) { final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request; ensureFlushedBuider(optTicks); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java index e536a06772..d033951879 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java @@ -55,8 +55,8 @@ public abstract class LocalProxyTransactionTest } @Test - public void testDoAbort() throws Exception { - transaction.doAbort(); + public void testAbort() throws Exception { + transaction.abort(); getTester().expectTransactionRequest(AbortLocalTransactionRequest.class); }