BUG-8538: rework transaction abort paths 05/58205/1
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 24 May 2017 12:42:04 +0000 (14:42 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 4 Jun 2017 07:47:41 +0000 (09:47 +0200)
Direct transaction abort path can end up touching proxy history's
maps, which it should not, as that happens only after purge. This
inconsistency has cropped up when purge was introduced.

Refactor the methods so that cohorts are removed only after purge,
and fix abort request routing such that it always enqueues a purge
request (possibly via successor). This also addresses a FIXME, as
we now have an enqueueAbort() request, which is not waiting on the
queue.

Change-Id: Ie291da70ace772274f33505db376a915b38e37c0
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 8fca604f2312ef365ce05343c2378cf36f2e31af)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java

index 8dfdf34..b8057c5 100644 (file)
@@ -330,14 +330,18 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      */
     final void abort() {
         checkNotSealed();
-        doAbort();
         parent.abortTransaction(this);
+
+        sendRequest(abortRequest(), resp -> {
+            LOG.debug("Transaction {} abort completed with {}", getIdentifier(), resp);
+            sendPurge();
+        });
     }
 
     final void abort(final VotingFuture<Void> ret) {
         checkSealed();
 
-        sendAbort(t -> {
+        sendDoAbort(t -> {
             if (t instanceof TransactionAbortSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
@@ -353,11 +357,24 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     }
 
     final void enqueueAbort(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        checkNotSealed();
+        parent.abortTransaction(this);
+
+        enqueueRequest(abortRequest(), resp -> {
+            LOG.debug("Transaction {} abort completed with {}", getIdentifier(), resp);
+            // Purge will be sent by the predecessor's callback
+            if (callback != null) {
+                callback.accept(resp);
+            }
+        }, enqueuedTicks);
+    }
+
+    final void enqueueDoAbort(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
         enqueueRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback,
             enqueuedTicks);
     }
 
-    final void sendAbort(final Consumer<Response<?, ?>> callback) {
+    final void sendDoAbort(final Consumer<Response<?, ?>> callback) {
         sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback);
     }
 
@@ -484,24 +501,29 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         });
     }
 
-    final void sendPurge() {
-        successfulRequests.clear();
+    private void sendPurge() {
+        sendPurge(null);
+    }
 
-        final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
-        sendRequest(req, t -> {
-            LOG.debug("Transaction {} purge completed", this);
-            parent.completeTransaction(this);
-        });
+    final void sendPurge(final Consumer<Response<?, ?>> callback) {
+        sendRequest(purgeRequest(), resp -> completePurge(resp, callback));
+    }
+
+    final void enqueuePurge(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        enqueueRequest(purgeRequest(), resp -> completePurge(resp, callback), enqueuedTicks);
     }
 
-    final void enqueuePurge(final long enqueuedTicks) {
+    private TransactionPurgeRequest purgeRequest() {
         successfulRequests.clear();
+        return new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
+    }
 
-        final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
-        enqueueRequest(req, t -> {
-            LOG.debug("Transaction {} purge completed", this);
-            parent.completeTransaction(this);
-        }, enqueuedTicks);
+    private void completePurge(final Response<?, ?> resp, final Consumer<Response<?, ?>> callback) {
+        LOG.debug("Transaction {} purge completed", this);
+        parent.completeTransaction(this);
+        if (callback != null) {
+            callback.accept(resp);
+        }
     }
 
     // Called with the connection unlocked
@@ -645,11 +667,11 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
     abstract void doSeal();
 
-    abstract void doAbort();
-
     @GuardedBy("this")
     abstract void flushState(AbstractProxyTransaction successor);
 
+    abstract TransactionRequest<?> abortRequest();
+
     abstract TransactionRequest<?> commitRequest(boolean coordinated);
 
     /**
index 5b47f22..49aedaf 100644 (file)
@@ -70,7 +70,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
 
     abstract DataTreeSnapshot readOnlyView();
 
-    abstract void applyModifyTransactionRequest(ModifyTransactionRequest request,
+    abstract void applyForwardedModifyTransactionRequest(ModifyTransactionRequest request,
             @Nullable Consumer<Response<?, ?>> callback);
 
     abstract void replayModifyTransactionRequest(ModifyTransactionRequest request,
@@ -87,9 +87,8 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
     }
 
     @Override
-    final void doAbort() {
-        sendAbort(new AbortLocalTransactionRequest(identifier, localActor()),
-            response -> LOG.debug("Transaction {} abort completed with {}", identifier, response));
+    final AbortLocalTransactionRequest abortRequest() {
+        return new AbortLocalTransactionRequest(identifier, localActor());
     }
 
     @Override
@@ -139,7 +138,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         } else if (handleReadRequest(request, callback)) {
             // No-op
         } else if (request instanceof TransactionPurgeRequest) {
-            enqueuePurge(enqueuedTicks);
+            enqueuePurge(callback, enqueuedTicks);
         } else if (request instanceof IncrementTransactionSequenceRequest) {
             // Local transactions do not have non-replayable requests which would be visible to the backend,
             // hence we can skip sequence increments.
@@ -159,11 +158,11 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
      */
     void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         if (request instanceof ModifyTransactionRequest) {
-            applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+            applyForwardedModifyTransactionRequest((ModifyTransactionRequest) request, callback);
         } else if (handleReadRequest(request, callback)) {
             // No-op
         } else if (request instanceof TransactionPurgeRequest) {
-            sendPurge();
+            sendPurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request " + request);
         }
@@ -203,7 +202,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
             successor.abort();
         } else if (request instanceof TransactionPurgeRequest) {
             LOG.debug("Forwarding purge {} to successor {}", request, successor);
-            successor.sendPurge();
+            successor.sendPurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
@@ -215,7 +214,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         if (request instanceof AbortLocalTransactionRequest) {
             successor.sendAbort(request, callback);
         } else if (request instanceof TransactionPurgeRequest) {
-            successor.sendPurge();
+            successor.sendPurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
index b600a66..4fed10d 100644 (file)
@@ -77,22 +77,20 @@ final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction {
     }
 
     @Override
-    void applyModifyTransactionRequest(final ModifyTransactionRequest request,
+    void applyForwardedModifyTransactionRequest(final ModifyTransactionRequest request,
             final Consumer<Response<?, ?>> callback) {
-        commonModifyTransactionRequest(request, callback);
+        commonModifyTransactionRequest(request);
         abort();
     }
 
     @Override
     void replayModifyTransactionRequest(final ModifyTransactionRequest request,
             final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
-        commonModifyTransactionRequest(request, callback);
-        // FIXME: this should go through the enqueueRequest() path
-        abort();
+        commonModifyTransactionRequest(request);
+        enqueueAbort(callback, enqueuedTicks);
     }
 
-    private static void commonModifyTransactionRequest(final ModifyTransactionRequest request,
-            final Consumer<Response<?, ?>> callback) {
+    private static void commonModifyTransactionRequest(final ModifyTransactionRequest request) {
         Verify.verify(request.getModifications().isEmpty());
 
         final PersistenceProtocol protocol = request.getPersistenceProtocol().get();
index 424a9ea..cbf316d 100644 (file)
@@ -203,7 +203,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
     }
 
     @Override
-    void applyModifyTransactionRequest(final ModifyTransactionRequest request,
+    void applyForwardedModifyTransactionRequest(final ModifyTransactionRequest request,
             final @Nullable Consumer<Response<?, ?>> callback) {
         commonModifyTransactionRequest(request, callback, this::sendRequest);
     }
@@ -275,7 +275,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
                 enqueuedTicks);
         } else if (request instanceof TransactionAbortRequest) {
-            enqueueAbort(callback, enqueuedTicks);
+            enqueueDoAbort(callback, enqueuedTicks);
         } else {
             super.handleReplayedRemoteRequest(request, callback, enqueuedTicks);
         }
@@ -290,7 +290,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
         } else if (request instanceof TransactionDoCommitRequest) {
             sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
         } else if (request instanceof TransactionAbortRequest) {
-            sendAbort(callback);
+            sendDoAbort(callback);
         } else {
             super.handleForwardedRemoteRequest(request, callback);
         }
index b529d94..34e8ba3 100644 (file)
@@ -376,8 +376,8 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     final void abortTransaction(final AbstractProxyTransaction tx) {
         lock.lock();
         try {
-            proxies.remove(tx.getIdentifier());
-            LOG.debug("Proxy {} aborting transaction {}", this, tx);
+            // Removal will be completed once purge completes
+            LOG.debug("Proxy {} aborted transaction {}", this, tx);
             onTransactionAborted(tx);
         } finally {
             lock.unlock();
index 829be47..0095ec5 100644 (file)
@@ -138,13 +138,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             isSnapshotOnly()), t -> completeRead(future, t), future);
     }
 
-    @Override
-    void doAbort() {
-        ensureInitializedBuilder();
-        builder.setAbort();
-        flushBuilder();
-    }
-
     private void ensureInitializedBuilder() {
         if (!builderBusy) {
             builder.setSequence(nextSequence());
@@ -256,7 +249,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         recordFinishedRequest(response);
     }
 
-    private ModifyTransactionRequest abortRequest() {
+    @Override
+    ModifyTransactionRequest abortRequest() {
         ensureInitializedBuilder();
         builder.setAbort();
         final ModifyTransactionRequest ret = builder.build();
@@ -365,9 +359,9 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
         } else if (request instanceof TransactionAbortRequest) {
             ensureFlushedBuider();
-            sendAbort(callback);
+            sendDoAbort(callback);
         } else if (request instanceof TransactionPurgeRequest) {
-            sendPurge();
+            sendPurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request {}" + request);
         }
@@ -490,9 +484,9 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
                 enqueuedTicks);
         } else if (request instanceof TransactionAbortRequest) {
             ensureFlushedBuider(optTicks);
-            enqueueAbort(callback, enqueuedTicks);
+            enqueueDoAbort(callback, enqueuedTicks);
         } else if (request instanceof TransactionPurgeRequest) {
-            enqueuePurge(enqueuedTicks);
+            enqueuePurge(callback, enqueuedTicks);
         } else if (request instanceof IncrementTransactionSequenceRequest) {
             final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
             ensureFlushedBuider(optTicks);
index e536a06..d033951 100644 (file)
@@ -55,8 +55,8 @@ public abstract class LocalProxyTransactionTest<T extends LocalProxyTransaction>
     }
 
     @Test
-    public void testDoAbort() throws Exception {
-        transaction.doAbort();
+    public void testAbort() throws Exception {
+        transaction.abort();
         getTester().expectTransactionRequest(AbortLocalTransactionRequest.class);
     }
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.