BUG-8403: move successor allocation to AbstractProxyTransaction 10/58210/1
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 30 May 2017 17:35:17 +0000 (19:35 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 4 Jun 2017 07:47:42 +0000 (09:47 +0200)
We still have a tiny race window where we do not correctly handle
reconnection, leading to an ISE splat -- this time if the final
stage of transaction completion.

The problem is that a reconnect attempt is happening while we are
waiting for the transaction purge to complete. At this point the
transaction has been completely resolved and the only remaining
task is to inform the backend that we have received all of the
state and hence it can throw it out (we will remove our state once
purge completes).

We are still allocating a live transaction in the local history
and the purge request replay does not logically close it, leading
to the splat.

To fix this, we really need to allocate a non-open tx successor,
which will not trip the successor local history. All the required
knowledge already resides in AbstractProxyHistory, except we do
not have a dedicated 'purging' state.

This makes the first step of moving the allocation caller to the
appropriate place.

Change-Id: If82957019b478f4d5132edda4d38e6bc026aa0ab
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit cc5009b8f3ea91f64ee48cda815c6a5e73a8a1af)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java

index bf56376fcee3d538d483a063d652ae6d3d5a5654..f4f2e19f76745685f04abdd190e3a79e07b32291 100644 (file)
@@ -543,9 +543,13 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     }
 
     // Called with the connection locked
-    final void replayMessages(final AbstractProxyTransaction successor,
-            final Iterable<ConnectionEntry> enqueuedEntries) {
+    final void replayMessages(final ProxyHistory successorHistory, final Iterable<ConnectionEntry> enqueuedEntries) {
         final SuccessorState local = getSuccessorState();
+        final State prevState = local.getPrevState();
+
+        final AbstractProxyTransaction successor = successorHistory.createTransactionProxy(getIdentifier(),
+            isSnapshotOnly());
+        LOG.debug("{} created successor transaction proxy {}", this, successor);
         local.setSuccessor(successor);
 
         // Replay successful requests first
@@ -593,7 +597,6 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
          * reconnecting have been forced to slow paths, which will be unlocked once we unblock the state latch
          * at the end of this method.
          */
-        final State prevState = local.getPrevState();
         if (SEALED.equals(prevState)) {
             LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
             flushState(successor);
index 88a79775baa94730eba45a01ed8e229c0b701c69..4764f24d495991f0082526fef2b569a62b828835 100644 (file)
@@ -231,11 +231,8 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             }
 
             for (AbstractProxyTransaction t : proxies.values()) {
-                LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
-                final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
-                    t.isSnapshotOnly());
-                LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
-                t.replayMessages(newProxy, previousEntries);
+                LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
+                t.replayMessages(successor, previousEntries);
             }
 
             // Now look for any finalizing messages
@@ -355,7 +352,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         return parent;
     }
 
-    final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
+    AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
             final boolean snapshotOnly) {
         lock.lock();
         try {
index 92c996407a6d607c5e1e413705467f68f1d1d419..f906c76528a40ab285ce9064b1bc5d05467fca52 100644 (file)
@@ -183,7 +183,12 @@ public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransa
                 new ReadTransactionRequest(TRANSACTION_ID, 1L, probe.ref(), PATH_1, true);
         transaction.recordSuccessfulRequest(successful2);
         transaction.startReconnect();
-        transaction.replayMessages(successor.getTransaction(), entries);
+
+        final ProxyHistory mockSuccessor = mock(ProxyHistory.class);
+        when(mockSuccessor.createTransactionProxy(TRANSACTION_ID, transaction.isSnapshotOnly()))
+            .thenReturn(successor.getTransaction());
+
+        transaction.replayMessages(mockSuccessor, entries);
 
         final ModifyTransactionRequest transformed = successor.expectTransactionRequest(ModifyTransactionRequest.class);
         Assert.assertNotNull(transformed);