BUG-8538: rework transaction abort paths 05/58205/1
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 24 May 2017 12:42:04 +0000 (14:42 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 4 Jun 2017 07:47:41 +0000 (09:47 +0200)
Direct transaction abort path can end up touching proxy history's
maps, which it should not, as that happens only after purge. This
inconsistency has cropped up when purge was introduced.

Refactor the methods so that cohorts are removed only after purge,
and fix abort request routing such that it always enqueues a purge
request (possibly via successor). This also addresses a FIXME, as
we now have an enqueueAbort() request, which is not waiting on the
queue.

Change-Id: Ie291da70ace772274f33505db376a915b38e37c0
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 8fca604f2312ef365ce05343c2378cf36f2e31af)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java

index 8dfdf34adfebae0a255d5f0f98c69886a3c9182d..b8057c5afe428fd162e0ddf02c6a2c6d11eab108 100644 (file)
@@ -330,14 +330,18 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      */
     final void abort() {
         checkNotSealed();
-        doAbort();
         parent.abortTransaction(this);
+
+        sendRequest(abortRequest(), resp -> {
+            LOG.debug("Transaction {} abort completed with {}", getIdentifier(), resp);
+            sendPurge();
+        });
     }
 
     final void abort(final VotingFuture<Void> ret) {
         checkSealed();
 
-        sendAbort(t -> {
+        sendDoAbort(t -> {
             if (t instanceof TransactionAbortSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
@@ -353,11 +357,24 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     }
 
     final void enqueueAbort(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        checkNotSealed();
+        parent.abortTransaction(this);
+
+        enqueueRequest(abortRequest(), resp -> {
+            LOG.debug("Transaction {} abort completed with {}", getIdentifier(), resp);
+            // Purge will be sent by the predecessor's callback
+            if (callback != null) {
+                callback.accept(resp);
+            }
+        }, enqueuedTicks);
+    }
+
+    final void enqueueDoAbort(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
         enqueueRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback,
             enqueuedTicks);
     }
 
-    final void sendAbort(final Consumer<Response<?, ?>> callback) {
+    final void sendDoAbort(final Consumer<Response<?, ?>> callback) {
         sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback);
     }
 
@@ -484,24 +501,29 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         });
     }
 
-    final void sendPurge() {
-        successfulRequests.clear();
+    private void sendPurge() {
+        sendPurge(null);
+    }
 
-        final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
-        sendRequest(req, t -> {
-            LOG.debug("Transaction {} purge completed", this);
-            parent.completeTransaction(this);
-        });
+    final void sendPurge(final Consumer<Response<?, ?>> callback) {
+        sendRequest(purgeRequest(), resp -> completePurge(resp, callback));
+    }
+
+    final void enqueuePurge(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+        enqueueRequest(purgeRequest(), resp -> completePurge(resp, callback), enqueuedTicks);
     }
 
-    final void enqueuePurge(final long enqueuedTicks) {
+    private TransactionPurgeRequest purgeRequest() {
         successfulRequests.clear();
+        return new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
+    }
 
-        final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
-        enqueueRequest(req, t -> {
-            LOG.debug("Transaction {} purge completed", this);
-            parent.completeTransaction(this);
-        }, enqueuedTicks);
+    private void completePurge(final Response<?, ?> resp, final Consumer<Response<?, ?>> callback) {
+        LOG.debug("Transaction {} purge completed", this);
+        parent.completeTransaction(this);
+        if (callback != null) {
+            callback.accept(resp);
+        }
     }
 
     // Called with the connection unlocked
@@ -645,11 +667,11 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
     abstract void doSeal();
 
-    abstract void doAbort();
-
     @GuardedBy("this")
     abstract void flushState(AbstractProxyTransaction successor);
 
+    abstract TransactionRequest<?> abortRequest();
+
     abstract TransactionRequest<?> commitRequest(boolean coordinated);
 
     /**
index 5b47f22971c9af76298e82032202871bb986b96b..49aedaf83d00f50223966f1cc941a69876072834 100644 (file)
@@ -70,7 +70,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
 
     abstract DataTreeSnapshot readOnlyView();
 
-    abstract void applyModifyTransactionRequest(ModifyTransactionRequest request,
+    abstract void applyForwardedModifyTransactionRequest(ModifyTransactionRequest request,
             @Nullable Consumer<Response<?, ?>> callback);
 
     abstract void replayModifyTransactionRequest(ModifyTransactionRequest request,
@@ -87,9 +87,8 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
     }
 
     @Override
-    final void doAbort() {
-        sendAbort(new AbortLocalTransactionRequest(identifier, localActor()),
-            response -> LOG.debug("Transaction {} abort completed with {}", identifier, response));
+    final AbortLocalTransactionRequest abortRequest() {
+        return new AbortLocalTransactionRequest(identifier, localActor());
     }
 
     @Override
@@ -139,7 +138,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         } else if (handleReadRequest(request, callback)) {
             // No-op
         } else if (request instanceof TransactionPurgeRequest) {
-            enqueuePurge(enqueuedTicks);
+            enqueuePurge(callback, enqueuedTicks);
         } else if (request instanceof IncrementTransactionSequenceRequest) {
             // Local transactions do not have non-replayable requests which would be visible to the backend,
             // hence we can skip sequence increments.
@@ -159,11 +158,11 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
      */
     void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         if (request instanceof ModifyTransactionRequest) {
-            applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+            applyForwardedModifyTransactionRequest((ModifyTransactionRequest) request, callback);
         } else if (handleReadRequest(request, callback)) {
             // No-op
         } else if (request instanceof TransactionPurgeRequest) {
-            sendPurge();
+            sendPurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request " + request);
         }
@@ -203,7 +202,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
             successor.abort();
         } else if (request instanceof TransactionPurgeRequest) {
             LOG.debug("Forwarding purge {} to successor {}", request, successor);
-            successor.sendPurge();
+            successor.sendPurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
@@ -215,7 +214,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         if (request instanceof AbortLocalTransactionRequest) {
             successor.sendAbort(request, callback);
         } else if (request instanceof TransactionPurgeRequest) {
-            successor.sendPurge();
+            successor.sendPurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
index b600a66ea6089c1473d28744481107d1090d2262..4fed10d4bf319cb4f399f5f0feedc2a2feadc007 100644 (file)
@@ -77,22 +77,20 @@ final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction {
     }
 
     @Override
-    void applyModifyTransactionRequest(final ModifyTransactionRequest request,
+    void applyForwardedModifyTransactionRequest(final ModifyTransactionRequest request,
             final Consumer<Response<?, ?>> callback) {
-        commonModifyTransactionRequest(request, callback);
+        commonModifyTransactionRequest(request);
         abort();
     }
 
     @Override
     void replayModifyTransactionRequest(final ModifyTransactionRequest request,
             final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
-        commonModifyTransactionRequest(request, callback);
-        // FIXME: this should go through the enqueueRequest() path
-        abort();
+        commonModifyTransactionRequest(request);
+        enqueueAbort(callback, enqueuedTicks);
     }
 
-    private static void commonModifyTransactionRequest(final ModifyTransactionRequest request,
-            final Consumer<Response<?, ?>> callback) {
+    private static void commonModifyTransactionRequest(final ModifyTransactionRequest request) {
         Verify.verify(request.getModifications().isEmpty());
 
         final PersistenceProtocol protocol = request.getPersistenceProtocol().get();
index 424a9ea0db220752c979dbdae89672bfcb505790..cbf316d88194e7cbbf59dccc057cbf9d6fc5f5dd 100644 (file)
@@ -203,7 +203,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
     }
 
     @Override
-    void applyModifyTransactionRequest(final ModifyTransactionRequest request,
+    void applyForwardedModifyTransactionRequest(final ModifyTransactionRequest request,
             final @Nullable Consumer<Response<?, ?>> callback) {
         commonModifyTransactionRequest(request, callback, this::sendRequest);
     }
@@ -275,7 +275,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
                 enqueuedTicks);
         } else if (request instanceof TransactionAbortRequest) {
-            enqueueAbort(callback, enqueuedTicks);
+            enqueueDoAbort(callback, enqueuedTicks);
         } else {
             super.handleReplayedRemoteRequest(request, callback, enqueuedTicks);
         }
@@ -290,7 +290,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
         } else if (request instanceof TransactionDoCommitRequest) {
             sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
         } else if (request instanceof TransactionAbortRequest) {
-            sendAbort(callback);
+            sendDoAbort(callback);
         } else {
             super.handleForwardedRemoteRequest(request, callback);
         }
index b529d94c2b226bb879b9c0f3cf4baa07a7414359..34e8ba37a379669de09013099ad83a8e9ceaa38c 100644 (file)
@@ -376,8 +376,8 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     final void abortTransaction(final AbstractProxyTransaction tx) {
         lock.lock();
         try {
-            proxies.remove(tx.getIdentifier());
-            LOG.debug("Proxy {} aborting transaction {}", this, tx);
+            // Removal will be completed once purge completes
+            LOG.debug("Proxy {} aborted transaction {}", this, tx);
             onTransactionAborted(tx);
         } finally {
             lock.unlock();
index 829be478f00a6210c88a027547d1fe6a76ec135a..0095ec58a37512c66aefd250652980a9b4dce0fb 100644 (file)
@@ -138,13 +138,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             isSnapshotOnly()), t -> completeRead(future, t), future);
     }
 
-    @Override
-    void doAbort() {
-        ensureInitializedBuilder();
-        builder.setAbort();
-        flushBuilder();
-    }
-
     private void ensureInitializedBuilder() {
         if (!builderBusy) {
             builder.setSequence(nextSequence());
@@ -256,7 +249,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         recordFinishedRequest(response);
     }
 
-    private ModifyTransactionRequest abortRequest() {
+    @Override
+    ModifyTransactionRequest abortRequest() {
         ensureInitializedBuilder();
         builder.setAbort();
         final ModifyTransactionRequest ret = builder.build();
@@ -365,9 +359,9 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
         } else if (request instanceof TransactionAbortRequest) {
             ensureFlushedBuider();
-            sendAbort(callback);
+            sendDoAbort(callback);
         } else if (request instanceof TransactionPurgeRequest) {
-            sendPurge();
+            sendPurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request {}" + request);
         }
@@ -490,9 +484,9 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
                 enqueuedTicks);
         } else if (request instanceof TransactionAbortRequest) {
             ensureFlushedBuider(optTicks);
-            enqueueAbort(callback, enqueuedTicks);
+            enqueueDoAbort(callback, enqueuedTicks);
         } else if (request instanceof TransactionPurgeRequest) {
-            enqueuePurge(enqueuedTicks);
+            enqueuePurge(callback, enqueuedTicks);
         } else if (request instanceof IncrementTransactionSequenceRequest) {
             final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
             ensureFlushedBuider(optTicks);
index e536a0677208c6a6aaec97d494d1b3807b626b20..d0339518792db44219734f811adb128145118dad 100644 (file)
@@ -55,8 +55,8 @@ public abstract class LocalProxyTransactionTest<T extends LocalProxyTransaction>
     }
 
     @Test
-    public void testDoAbort() throws Exception {
-        transaction.doAbort();
+    public void testAbort() throws Exception {
+        transaction.abort();
         getTester().expectTransactionRequest(AbortLocalTransactionRequest.class);
     }