BUG-8403: propagate DONE state to successor 13/58213/2
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 29 May 2017 20:53:53 +0000 (22:53 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 4 Jun 2017 15:32:07 +0000 (17:32 +0200)
We need correct accounting for DONE non-standalone local transactions,
as such transactions do not interact with open/closed semantics.

Propagate DONE via a simple flag, which we check in local ProxyHistory
a create a proxy without a backing modification.

Change-Id: Ie921db8c9e40f30934c119b74c31ca5418b61548
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 59ffaa4e95ffc8f8f04d1ca8d4f45f2ac1ef23b6)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohortTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohortTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransactionTest.java

index ac91f2d..cb56864 100644 (file)
@@ -248,11 +248,18 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * variable. It uses pre-allocated objects for fast paths (i.e. no successor present) and a per-transition object
      * for slow paths (when successor is injected/present).
      */
-    private volatile int sealed = 0;
-    private volatile State state = OPEN;
+    private volatile int sealed;
+    private volatile State state;
 
-    AbstractProxyTransaction(final ProxyHistory parent) {
+    AbstractProxyTransaction(final ProxyHistory parent, final boolean isDone) {
         this.parent = Preconditions.checkNotNull(parent);
+        if (isDone) {
+            state = DONE;
+            // DONE implies previous seal operation completed
+            sealed = 1;
+        } else {
+            state = OPEN;
+        }
     }
 
     final void executeInActor(final Runnable command) {
@@ -619,9 +626,11 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         final SuccessorState local = getSuccessorState();
         final State prevState = local.getPrevState();
 
+        final boolean isDone = DONE.equals(state)
+                || state instanceof SuccessorState && ((SuccessorState) state).isDone();
         final AbstractProxyTransaction successor = successorHistory.createTransactionProxy(getIdentifier(),
-            isSnapshotOnly());
-        LOG.debug("{} created successor transaction proxy {}", this, successor);
+            isSnapshotOnly(), isDone);
+        LOG.debug("{} created successor {}", this, successor);
         local.setSuccessor(successor);
 
         // Replay successful requests first
index 02f8e5a..feba5fd 100644 (file)
@@ -12,6 +12,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import java.util.function.Consumer;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
@@ -58,8 +59,8 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
 
     private final TransactionIdentifier identifier;
 
-    LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) {
-        super(parent);
+    LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, final boolean isDone) {
+        super(parent, isDone);
         this.identifier = Preconditions.checkNotNull(identifier);
     }
 
@@ -68,7 +69,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         return identifier;
     }
 
-    abstract DataTreeSnapshot readOnlyView();
+    abstract @Nonnull DataTreeSnapshot readOnlyView();
 
     abstract void applyForwardedModifyTransactionRequest(ModifyTransactionRequest request,
             @Nullable Consumer<Response<?, ?>> callback);
index 4fed10d..7cc245a 100644 (file)
@@ -32,10 +32,16 @@ final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction {
 
     LocalReadOnlyProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
         final DataTreeSnapshot snapshot) {
-        super(parent, identifier);
+        super(parent, identifier, false);
         this.snapshot = Preconditions.checkNotNull(snapshot);
     }
 
+    LocalReadOnlyProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) {
+        super(parent, identifier, true);
+        // It is an error to touch snapshot once we are DONE
+        this.snapshot = null;
+    }
+
     @Override
     boolean isSnapshotOnly() {
         return true;
@@ -43,7 +49,7 @@ final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction {
 
     @Override
     DataTreeSnapshot readOnlyView() {
-        return snapshot;
+        return Preconditions.checkNotNull(snapshot, "Transaction %s is DONE", getIdentifier());
     }
 
     @Override
index cbf316d..eee4fd0 100644 (file)
@@ -13,6 +13,7 @@ import java.util.Optional;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
@@ -84,10 +85,16 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
 
     LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
         final DataTreeSnapshot snapshot) {
-        super(parent, identifier);
+        super(parent, identifier, false);
         this.modification = (CursorAwareDataTreeModification) snapshot.newModification();
     }
 
+    LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) {
+        super(parent, identifier, true);
+        // This is DONE transaction, this should never be touched
+        this.modification = null;
+    }
+
     @Override
     boolean isSnapshotOnly() {
         return false;
@@ -321,12 +328,12 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
         closedException = this::abortedException;
     }
 
-    private CursorAwareDataTreeModification getModification() {
+    private @Nonnull CursorAwareDataTreeModification getModification() {
         if (closedException != null) {
             throw closedException.get();
         }
 
-        return modification;
+        return Preconditions.checkNotNull(modification, "Transaction %s is DONE", getIdentifier());
     }
 
     private void sendCommit(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
index d66b748..e75a2df 100644 (file)
@@ -86,9 +86,15 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
-                final TransactionIdentifier txId, final boolean snapshotOnly) {
+                final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
 
+            if (isDone) {
+                // Done transactions do not register on our radar on should not have any state associated.
+                return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
+                        : new LocalReadWriteProxyTransaction(this, txId);
+            }
+
             // onTransactionCompleted() runs concurrently
             final LocalReadWriteProxyTransaction localSealed = lastSealed;
             final DataTreeSnapshot baseSnapshot;
@@ -145,7 +151,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
-                final TransactionIdentifier txId, final boolean snapshotOnly) {
+                final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
             final DataTreeSnapshot snapshot = takeSnapshot();
             return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
                 new LocalReadWriteProxyTransaction(this, txId, snapshot);
@@ -165,8 +171,8 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
-                final TransactionIdentifier txId, final boolean snapshotOnly) {
-            return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
+                final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
+            return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
         }
 
         @Override
@@ -183,8 +189,8 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
-                final TransactionIdentifier txId, final boolean snapshotOnly) {
-            return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
+                final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
+            return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
         }
 
         @Override
@@ -352,16 +358,21 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         return parent;
     }
 
-    AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
+    final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
             final boolean snapshotOnly) {
+        return createTransactionProxy(txId, snapshotOnly, false);
+    }
+
+    AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
+            final boolean isDone) {
         lock.lock();
         try {
             if (successor != null) {
-                return successor.createTransactionProxy(txId, snapshotOnly);
+                return successor.createTransactionProxy(txId, snapshotOnly, isDone);
             }
 
             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
-            final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
+            final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
             proxies.put(proxyId, ret);
             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
             return ret;
@@ -429,7 +440,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
     @GuardedBy("lock")
     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
-            TransactionIdentifier txId, boolean snapshotOnly);
+            TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
 
     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
 
index 6e54695..3b5f80f 100644 (file)
@@ -78,8 +78,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     private volatile Exception operationFailure;
 
     RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
-            final boolean snapshotOnly, final boolean sendReadyOnSeal) {
-        super(parent);
+            final boolean snapshotOnly, final boolean sendReadyOnSeal, final boolean isDone) {
+        super(parent, isDone);
         this.snapshotOnly = snapshotOnly;
         this.sendReadyOnSeal = sendReadyOnSeal;
         builder = new ModifyTransactionRequestBuilder(identifier, localActor());
index f906c76..0f51c8e 100644 (file)
@@ -185,7 +185,7 @@ public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransa
         transaction.startReconnect();
 
         final ProxyHistory mockSuccessor = mock(ProxyHistory.class);
-        when(mockSuccessor.createTransactionProxy(TRANSACTION_ID, transaction.isSnapshotOnly()))
+        when(mockSuccessor.createTransactionProxy(TRANSACTION_ID, transaction.isSnapshotOnly(), false))
             .thenReturn(successor.getTransaction());
 
         transaction.replayMessages(mockSuccessor, entries);
@@ -321,7 +321,7 @@ public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransa
                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
         final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
         final RemoteProxyTransaction transaction =
-                new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
+                new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false, false);
         return new TransactionTester<>(transaction, connection, backendProbe);
     }
 }
index 6fde068..1b90e4b 100644 (file)
@@ -175,7 +175,7 @@ public class ClientTransactionCommitCohortTest {
                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
         final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
         final RemoteProxyTransaction transaction =
-                new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
+                new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false, false);
         return new TransactionTester<>(transaction, connection, backendProbe);
     }
 
index 24e898a..b26e86a 100644 (file)
@@ -103,7 +103,7 @@ public class DirectTransactionCommitCohortTest {
                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
         final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
         final RemoteProxyTransaction transaction =
-                new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
+                new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false, false);
         return new TransactionTester<>(transaction, connection, backendProbe);
     }
 
index f646bed..c8bdd27 100644 (file)
@@ -47,7 +47,7 @@ public class RemoteProxyTransactionTest extends AbstractProxyTransactionTest<Rem
     @Override
     protected RemoteProxyTransaction createTransaction(final ProxyHistory parent, final TransactionIdentifier id,
                                                        final DataTreeSnapshot snapshot) {
-        return new RemoteProxyTransaction(parent, TRANSACTION_ID, false, false);
+        return new RemoteProxyTransaction(parent, TRANSACTION_ID, false, false, false);
     }
 
     @Override

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.