BUG-8403: go through the DONE transition 12/58212/1
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 31 May 2017 17:49:52 +0000 (19:49 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 4 Jun 2017 07:47:42 +0000 (09:47 +0200)
Third step of the fix: make make AbstractProxyTransaction go through
the DONE state before retiring. This ends up also fixing breakage
in local chain transactions, which could end up leaking because we
never go back to just using the base data tree.

Change-Id: I97ac1687eaf3ecd8f46a68c6170891ea06703e95
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 9b4c07ca27b2c3c7ca5d5e790aa1f121ce4857f3)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java

index 549cab269fb7682b4575c141243dda12b028d23f..ac91f2d0413ad8aca71391ba86921c92dc1b008b 100644 (file)
@@ -119,11 +119,14 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * at which point the request is routed to the successor transaction. This is a relatively heavy-weight solution
      * to the problem of state transfer, but the user will observe it only if the race condition is hit.
      */
      * at which point the request is routed to the successor transaction. This is a relatively heavy-weight solution
      * to the problem of state transfer, but the user will observe it only if the race condition is hit.
      */
-    private static final class SuccessorState extends State {
+    private static class SuccessorState extends State {
         private final CountDownLatch latch = new CountDownLatch(1);
         private AbstractProxyTransaction successor;
         private State prevState;
 
         private final CountDownLatch latch = new CountDownLatch(1);
         private AbstractProxyTransaction successor;
         private State prevState;
 
+        // SUCCESSOR + DONE
+        private boolean done;
+
         SuccessorState() {
             super("SUCCESSOR");
         }
         SuccessorState() {
             super("SUCCESSOR");
         }
@@ -163,6 +166,14 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
                     successor, this.successor);
             this.successor = Preconditions.checkNotNull(successor);
         }
                     successor, this.successor);
             this.successor = Preconditions.checkNotNull(successor);
         }
+
+        boolean isDone() {
+            return done;
+        }
+
+        void setDone() {
+            done = true;
+        }
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
@@ -544,6 +555,10 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             }
 
             LOG.debug("Transaction {} doCommit completed", this);
             }
 
             LOG.debug("Transaction {} doCommit completed", this);
+
+            // Needed for ProxyHistory$Local data tree rebase points.
+            parent.completeTransaction(this);
+
             enqueuePurge();
         });
     }
             enqueuePurge();
         });
     }
@@ -558,20 +573,30 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     }
 
     final void enqueuePurge(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
     }
 
     final void enqueuePurge(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
-        enqueueRequest(purgeRequest(), resp -> {
-            LOG.debug("Transaction {} purge completed", this);
-            parent.completeTransaction(this);
+        LOG.debug("{}: initiating purge", this);
+
+        final State prev = state;
+        if (prev instanceof SuccessorState) {
+            ((SuccessorState) prev).setDone();
+        } else {
+            final boolean success = STATE_UPDATER.compareAndSet(this, prev, DONE);
+            if (!success) {
+                LOG.warn("{}: moved from state {} while we were purging it", this, prev);
+            }
+        }
+
+        successfulRequests.clear();
+
+        enqueueRequest(new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor()), resp -> {
+            LOG.debug("{}: purge completed", this);
+            parent.purgeTransaction(this);
+
             if (callback != null) {
                 callback.accept(resp);
             }
         }, enqueuedTicks);
     }
 
             if (callback != null) {
                 callback.accept(resp);
             }
         }, enqueuedTicks);
     }
 
-    private TransactionPurgeRequest purgeRequest() {
-        successfulRequests.clear();
-        return new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
-    }
-
     // Called with the connection unlocked
     final synchronized void startReconnect() {
         // At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous
     // Called with the connection unlocked
     final synchronized void startReconnect() {
         // At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous
index 4764f24d495991f0082526fef2b569a62b828835..d66b748f5bf6da559261b030e31f552833bdd03c 100644 (file)
@@ -384,7 +384,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     final void completeTransaction(final AbstractProxyTransaction tx) {
         lock.lock();
         try {
     final void completeTransaction(final AbstractProxyTransaction tx) {
         lock.lock();
         try {
-            proxies.remove(tx.getIdentifier());
+            // Removal will be completed once purge completes
             LOG.debug("Proxy {} completing transaction {}", this, tx);
             onTransactionCompleted(tx);
         } finally {
             LOG.debug("Proxy {} completing transaction {}", this, tx);
             onTransactionCompleted(tx);
         } finally {
@@ -392,6 +392,16 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
     }
 
         }
     }
 
+    void purgeTransaction(final AbstractProxyTransaction tx) {
+        lock.lock();
+        try {
+            proxies.remove(tx.getIdentifier());
+            LOG.debug("Proxy {} purged transaction {}", this, tx);
+        } finally {
+            lock.unlock();
+        }
+    }
+
     final void close() {
         lock.lock();
         try {
     final void close() {
         lock.lock();
         try {