BUG-5280: fix invalid local transaction replay 27/54327/9
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 5 Apr 2017 16:41:14 +0000 (18:41 +0200)
committerRobert Varga <nite@hq.sk>
Tue, 18 Apr 2017 08:27:08 +0000 (08:27 +0000)
When we transition from a connecting to connected local connection,
we may encounter operations which are invalid and these violations
are detected during transaction replay.

If such replay fails, we need to suppress reporting the error until
the user initiates canCommit or directCommit, at which point we need
to report the delayed failure.

For reasons of consistency, we perform this suppression even under
normal connected circumstances.

Change-Id: I2018498afff0e463dbdceaec5c50e8ebf088001b
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
16 files changed:
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CommitLocalTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/CommitLocalTransactionRequestTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java

index 48f5cc58718f9cd4f4693f5dc5acd71d93b86a0f..5cff1a8f5cd9482f8bb330f49c524773d05f68d2 100644 (file)
@@ -12,7 +12,9 @@ import com.google.common.annotations.Beta;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.base.Preconditions;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Optional;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
@@ -31,15 +33,28 @@ public final class CommitLocalTransactionRequest
             + "implements writeReplace to delegate serialization to a Proxy class and thus instances of this class "
             + "aren't serialized. FindBugs does not recognize this.")
     private final DataTreeModification mod;
+    private final Exception delayedFailure;
     private final boolean coordinated;
 
     public CommitLocalTransactionRequest(@Nonnull final TransactionIdentifier identifier, final long sequence,
-            @Nonnull final ActorRef replyTo, @Nonnull final DataTreeModification mod, final boolean coordinated) {
+            @Nonnull final ActorRef replyTo, @Nonnull final DataTreeModification mod,
+            @Nullable final Exception delayedFailure, final boolean coordinated) {
         super(identifier, sequence, replyTo);
         this.mod = Preconditions.checkNotNull(mod);
+        this.delayedFailure = delayedFailure;
         this.coordinated = coordinated;
     }
 
+    /**
+     * Return the delayed error detected on the frontend. If this error is present, it will be reported as the result
+     * of the first step of the commit process.
+     *
+     * @return Delayed failure, if present.
+     */
+    public Optional<Exception> getDelayedFailure() {
+        return Optional.ofNullable(delayedFailure);
+    }
+
     public DataTreeModification getModification() {
         return mod;
     }
@@ -58,6 +73,7 @@ public final class CommitLocalTransactionRequest
 
     @Override
     protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
-        return super.addToStringAttributes(toStringHelper).add("coordinated", coordinated);
+        return super.addToStringAttributes(toStringHelper).add("coordinated", coordinated)
+                .add("delayedError", delayedFailure);
     }
 }
index d8dc84293a32e4f2724028bde4ce93073c08de53..61b7dc2ad40b2e95ca6587b3ecc8789b11b4ca3a 100644 (file)
@@ -31,7 +31,7 @@ public class CommitLocalTransactionRequestTest
     private static final boolean COORDINATED = true;
 
     private static final CommitLocalTransactionRequest OBJECT = new CommitLocalTransactionRequest(
-            TRANSACTION, 0, ACTOR_REF, MODIFICATION, COORDINATED);
+            TRANSACTION, 0, ACTOR_REF, MODIFICATION, null, COORDINATED);
 
     @Override
     protected CommitLocalTransactionRequest object() {
index e2e93de98ccf671df3dab6d4bd4a5148d5abcb53..a41bef9c9e992518c672c418da82769ea6cfdfc9 100644 (file)
@@ -103,7 +103,13 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             return;
         }
 
-        mod.delete(path);
+        try {
+            mod.delete(path);
+        } catch (Exception e) {
+            LOG.debug("Transaction {} delete on {} incurred failure, delaying it until commit", getIdentifier(), path,
+                e);
+            recordedFailure = e;
+        }
     }
 
     @Override
@@ -115,7 +121,13 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             return;
         }
 
-        mod.merge(path, data);
+        try {
+            mod.merge(path, data);
+        } catch (Exception e) {
+            LOG.debug("Transaction {} merge to {} incurred failure, delaying it until commit", getIdentifier(), path,
+                e);
+            recordedFailure = e;
+        }
     }
 
     @Override
@@ -127,7 +139,13 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             return;
         }
 
-        mod.write(path, data);
+        try {
+            mod.write(path, data);
+        } catch (Exception e) {
+            LOG.debug("Transaction {} write to {} incurred failure, delaying it until commit", getIdentifier(), path,
+                e);
+            recordedFailure = e;
+        }
     }
 
     private RuntimeException abortedException() {
@@ -142,7 +160,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
     CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
         final CursorAwareDataTreeModification mod = getModification();
         final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(getIdentifier(), nextSequence(),
-            localActor(), mod, coordinated);
+            localActor(), mod, recordedFailure, coordinated);
         closedException = this::submittedException;
         return ret;
     }
@@ -185,11 +203,11 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             final @Nullable Consumer<Response<?, ?>> callback) {
         for (final TransactionModification mod : request.getModifications()) {
             if (mod instanceof TransactionWrite) {
-                getModification().write(mod.getPath(), ((TransactionWrite)mod).getData());
+                write(mod.getPath(), ((TransactionWrite)mod).getData());
             } else if (mod instanceof TransactionMerge) {
-                getModification().merge(mod.getPath(), ((TransactionMerge)mod).getData());
+                merge(mod.getPath(), ((TransactionMerge)mod).getData());
             } else if (mod instanceof TransactionDelete) {
-                getModification().delete(mod.getPath());
+                delete(mod.getPath());
             } else {
                 throw new IllegalArgumentException("Unsupported modification " + mod);
             }
index aef83aee6704a0d26585cf549949d4f1c93a332c..6ca58888d3a830e772dc07fd95988bac0ef42e1e 100644 (file)
@@ -192,6 +192,9 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
     abstract FrontendTransaction createReadyTransaction(TransactionIdentifier id, DataTreeModification mod)
         throws RequestException;
 
+    abstract ShardDataTreeCohort createFailedCohort(TransactionIdentifier id, DataTreeModification mod,
+            Exception failure);
+
     abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);
 
     @Override
index f9d9580214e41c2d28430c3a46c68b005ad6e8bc..099095325c2a97cb452858f2cf60957a09f2c31e 100644 (file)
@@ -215,7 +215,6 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
     private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
         readyCohort.commit(new FutureCallback<UnsignedLong>() {
-
             @Override
             public void onSuccess(final UnsignedLong result) {
                 successfulCommit(envelope, startTime);
@@ -237,16 +236,22 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
     private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
-        if (sealedModification.equals(request.getModification())) {
+        if (!sealedModification.equals(request.getModification())) {
+            LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification());
+            throw new UnsupportedRequestException(request);
+        }
+
+        final java.util.Optional<Exception> optFailure = request.getDelayedFailure();
+        if (optFailure.isPresent()) {
+            readyCohort = history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get());
+        } else {
             readyCohort = history().createReadyCohort(getIdentifier(), sealedModification);
+        }
 
-            if (request.isCoordinated()) {
-                coordinatedCommit(envelope, now);
-            } else {
-                directCommit(envelope, now);
-            }
+        if (request.isCoordinated()) {
+            coordinatedCommit(envelope, now);
         } else {
-            throw new UnsupportedRequestException(request);
+            directCommit(envelope, now);
         }
     }
 
index 94c0965c007178828cdc04cdeea3170962abdbf8..20d18e156e142a80467a41fad459969ba6f83d79 100644 (file)
@@ -69,6 +69,12 @@ final class LocalFrontendHistory extends AbstractFrontendHistory {
         return FrontendReadWriteTransaction.createReady(this, id, mod);
     }
 
+    @Override
+    ShardDataTreeCohort createFailedCohort(final TransactionIdentifier id, final DataTreeModification mod,
+            final Exception failure) {
+        return chain.createFailedCohort(id, mod, failure);
+    }
+
     @Override
     ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) {
         return chain.createReadyCohort(id, mod);
index 36a6a70f38717b755fbf44a39795de3eca46fb21..9abb00292f1ac682eaaebb3637de0c44c80f9614 100644 (file)
@@ -732,6 +732,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
             Exception cause;
             try {
+                cohort.throwCanCommitFailure();
+
                 tip.validate(modification);
                 LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
                 cohort.successfulCanCommit();
@@ -946,10 +948,19 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         cohortRegistry.process(sender, message);
     }
 
+
+    @Override
+    ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+            final Exception failure) {
+        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.DeadOnArrival(this, mod, txId, failure);
+        pendingTransactions.add(new CommitEntry(cohort, ticker().read()));
+        return cohort;
+    }
+
     @Override
     ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
-            final DataTreeModification modification) {
-        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
+            final DataTreeModification mod) {
+        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.Normal(this, mod, txId,
                 cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
         pendingTransactions.add(new CommitEntry(cohort, ticker().read()));
         return cohort;
index 794c6d82187d94f1c29a4f3bd8e25bd598cb5ca8..f2e9af3656abdc6bf66f36f645e736d15b8d2168 100644 (file)
@@ -119,7 +119,13 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
     }
 
     @Override
-    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification modification) {
-        return dataTree.createReadyCohort(txId, modification);
+    ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+            final Exception failure) {
+        return dataTree.createFailedCohort(txId, mod, failure);
+    }
+
+    @Override
+    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+        return dataTree.createReadyCohort(txId, mod);
     }
 }
index a280a2229111082c4abac657042a468971c957cc..23fa0c286a3c9728ff3c32a80155f88a0a176a0a 100644 (file)
@@ -18,5 +18,8 @@ abstract class ShardDataTreeTransactionParent {
 
     abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
 
-    abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);
+    abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier txId, DataTreeModification mod);
+
+    abstract ShardDataTreeCohort createFailedCohort(TransactionIdentifier txId, DataTreeModification mod,
+            Exception failure);
 }
index 8d947e8c561e175e29472f1d143eca3c13b22bc4..2a8fdbe3dbdfbd6d0b675c311de83d5528ae6686 100644 (file)
@@ -24,7 +24,34 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 
-final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
+abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
+    static final class DeadOnArrival extends SimpleShardDataTreeCohort {
+        private final Exception failure;
+
+        DeadOnArrival(final ShardDataTree dataTree, final DataTreeModification transaction,
+            final TransactionIdentifier transactionId, final Exception failure) {
+            super(dataTree, transaction, transactionId, null);
+            this.failure = Preconditions.checkNotNull(failure);
+        }
+
+        @Override
+        void throwCanCommitFailure() throws Exception {
+            throw failure;
+        }
+    }
+
+    static final class Normal extends SimpleShardDataTreeCohort {
+        Normal(final ShardDataTree dataTree, final DataTreeModification transaction,
+            final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) {
+            super(dataTree, transaction, transactionId, Preconditions.checkNotNull(userCohorts));
+        }
+
+        @Override
+        void throwCanCommitFailure() {
+            // No-op
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
 
     private final DataTreeModification transaction;
@@ -42,7 +69,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         this.dataTree = Preconditions.checkNotNull(dataTree);
         this.transaction = Preconditions.checkNotNull(transaction);
         this.transactionId = Preconditions.checkNotNull(transactionId);
-        this.userCohorts = Preconditions.checkNotNull(userCohorts);
+        this.userCohorts = userCohorts;
     }
 
     @Override
@@ -145,7 +172,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         return ret;
     }
 
-    void setNewCandidate(DataTreeCandidateTip dataTreeCandidate) {
+    void setNewCandidate(final DataTreeCandidateTip dataTreeCandidate) {
         checkState(State.PRE_COMMIT_COMPLETE);
         this.candidate = Verify.verifyNotNull(dataTreeCandidate);
     }
@@ -221,6 +248,13 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         this.nextFailure = Preconditions.checkNotNull(cause);
     }
 
+    /**
+     * If there is an initial failure, throw it so the caller can process it.
+     *
+     * @throws Exception reported failure.
+     */
+    abstract void throwCanCommitFailure() throws Exception;
+
     @Override
     public boolean isFailed() {
         return state == State.FAILED || nextFailure != null;
index f08ff4a445b2adbdd2821e63311a6ba5b84728c7..7fd53c7b7cb66f72805e6b3939a57a36be78a4c3 100644 (file)
@@ -72,6 +72,12 @@ final class StandaloneFrontendHistory extends AbstractFrontendHistory {
         return FrontendReadWriteTransaction.createReady(this, id, mod);
     }
 
+    @Override
+    ShardDataTreeCohort createFailedCohort(final TransactionIdentifier id, final DataTreeModification mod,
+            final Exception failure) {
+        return tree.createFailedCohort(id, mod, failure);
+    }
+
     @Override
     ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) {
         return tree.createReadyCohort(id, mod);
index 89a4bda4d1e94cddb164b039ca7766b0d17bdb15..6f66b3e933134b3bbcd048b970631b04a0b4d24e 100644 (file)
@@ -112,7 +112,7 @@ public abstract class LocalProxyTransactionTest<T extends LocalProxyTransaction>
         final TestProbe probe = createProbe();
         final CursorAwareDataTreeModification modification = mock(CursorAwareDataTreeModification.class);
         final CommitLocalTransactionRequest request =
-                new CommitLocalTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), modification, true);
+                new CommitLocalTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), modification, null, true);
         doAnswer(this::applyToCursorAnswer).when(modification).applyToCursor(any());
         final ModifyTransactionRequest modifyRequest = testForwardToRemote(request, ModifyTransactionRequest.class);
         verify(modification).applyToCursor(any());
index 4b07f3ae9f0c7fd2a2b78223a6f8573faf3462c6..9cec79c9f8d5897ddcc22065976463b444576887 100644 (file)
@@ -209,7 +209,7 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes
         final TestProbe probe = createProbe();
         final DataTreeModification mod = mock(DataTreeModification.class);
         final TransactionRequest<?> request =
-                new CommitLocalTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), mod, false);
+                new CommitLocalTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), mod, null, false);
         testForwardToLocal(request, CommitLocalTransactionRequest.class);
     }
 
index 20dd17d15c7e70b49d1e99c427151c215a18db4f..31e36cf679ea58454643bfebdf0eb71ca142e9ec 100644 (file)
@@ -45,7 +45,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.junit.After;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -1064,9 +1063,6 @@ public class DistributedDataStoreIntegrationTest {
 
     @Test
     public void testChainedTransactionFailureWithSingleShard() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
-
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
                 try (final AbstractDataStore dataStore = setupAbstractDataStore(
@@ -1110,9 +1106,6 @@ public class DistributedDataStoreIntegrationTest {
 
     @Test
     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
-
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
                 try (final AbstractDataStore dataStore = setupAbstractDataStore(
index 5fef104eaa33d116d2a31bf9fbe0f692504a493c..5b218715737b6a34032aacc0beb01826e2da1444 100644 (file)
@@ -62,7 +62,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest {
         doNothing().when(mockUserCohorts).commit();
         doReturn(Optional.empty()).when(mockUserCohorts).abort();
 
-        cohort = new SimpleShardDataTreeCohort(mockShardDataTree, mockModification, nextTransactionId(),
+        cohort = new SimpleShardDataTreeCohort.Normal(mockShardDataTree, mockModification, nextTransactionId(),
             mockUserCohorts);
     }