BUG-8403: go through the DONE transition 12/58212/1
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 31 May 2017 17:49:52 +0000 (19:49 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 4 Jun 2017 07:47:42 +0000 (09:47 +0200)
Third step of the fix: make make AbstractProxyTransaction go through
the DONE state before retiring. This ends up also fixing breakage
in local chain transactions, which could end up leaking because we
never go back to just using the base data tree.

Change-Id: I97ac1687eaf3ecd8f46a68c6170891ea06703e95
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 9b4c07ca27b2c3c7ca5d5e790aa1f121ce4857f3)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java

index 549cab2..ac91f2d 100644 (file)
@@ -119,11 +119,14 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * at which point the request is routed to the successor transaction. This is a relatively heavy-weight solution
      * to the problem of state transfer, but the user will observe it only if the race condition is hit.
      */
-    private static final class SuccessorState extends State {
+    private static class SuccessorState extends State {
         private final CountDownLatch latch = new CountDownLatch(1);
         private AbstractProxyTransaction successor;
         private State prevState;
 
+        // SUCCESSOR + DONE
+        private boolean done;
+
         SuccessorState() {
             super("SUCCESSOR");
         }
@@ -163,6 +166,14 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
                     successor, this.successor);
             this.successor = Preconditions.checkNotNull(successor);
         }
+
+        boolean isDone() {
+            return done;
+        }
+
+        void setDone() {
+            done = true;
+        }
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
@@ -544,6 +555,10 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             }
 
             LOG.debug("Transaction {} doCommit completed", this);
+
+            // Needed for ProxyHistory$Local data tree rebase points.
+            parent.completeTransaction(this);
+
             enqueuePurge();
         });
     }
@@ -558,20 +573,30 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     }
 
     final void enqueuePurge(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
-        enqueueRequest(purgeRequest(), resp -> {
-            LOG.debug("Transaction {} purge completed", this);
-            parent.completeTransaction(this);
+        LOG.debug("{}: initiating purge", this);
+
+        final State prev = state;
+        if (prev instanceof SuccessorState) {
+            ((SuccessorState) prev).setDone();
+        } else {
+            final boolean success = STATE_UPDATER.compareAndSet(this, prev, DONE);
+            if (!success) {
+                LOG.warn("{}: moved from state {} while we were purging it", this, prev);
+            }
+        }
+
+        successfulRequests.clear();
+
+        enqueueRequest(new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor()), resp -> {
+            LOG.debug("{}: purge completed", this);
+            parent.purgeTransaction(this);
+
             if (callback != null) {
                 callback.accept(resp);
             }
         }, enqueuedTicks);
     }
 
-    private TransactionPurgeRequest purgeRequest() {
-        successfulRequests.clear();
-        return new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
-    }
-
     // Called with the connection unlocked
     final synchronized void startReconnect() {
         // At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous
index 4764f24..d66b748 100644 (file)
@@ -384,7 +384,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     final void completeTransaction(final AbstractProxyTransaction tx) {
         lock.lock();
         try {
-            proxies.remove(tx.getIdentifier());
+            // Removal will be completed once purge completes
             LOG.debug("Proxy {} completing transaction {}", this, tx);
             onTransactionCompleted(tx);
         } finally {
@@ -392,6 +392,16 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
     }
 
+    void purgeTransaction(final AbstractProxyTransaction tx) {
+        lock.lock();
+        try {
+            proxies.remove(tx.getIdentifier());
+            LOG.debug("Proxy {} purged transaction {}", this, tx);
+        } finally {
+            lock.unlock();
+        }
+    }
+
     final void close() {
         lock.lock();
         try {

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.