BUG-5280: fix invalid local transaction replay 27/54327/9
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 5 Apr 2017 16:41:14 +0000 (18:41 +0200)
committerRobert Varga <nite@hq.sk>
Tue, 18 Apr 2017 08:27:08 +0000 (08:27 +0000)
When we transition from a connecting to connected local connection,
we may encounter operations which are invalid and these violations
are detected during transaction replay.

If such replay fails, we need to suppress reporting the error until
the user initiates canCommit or directCommit, at which point we need
to report the delayed failure.

For reasons of consistency, we perform this suppression even under
normal connected circumstances.

Change-Id: I2018498afff0e463dbdceaec5c50e8ebf088001b
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
16 files changed:
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CommitLocalTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/CommitLocalTransactionRequestTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java

index 48f5cc5..5cff1a8 100644 (file)
@@ -12,7 +12,9 @@ import com.google.common.annotations.Beta;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.base.Preconditions;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Optional;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
@@ -31,15 +33,28 @@ public final class CommitLocalTransactionRequest
             + "implements writeReplace to delegate serialization to a Proxy class and thus instances of this class "
             + "aren't serialized. FindBugs does not recognize this.")
     private final DataTreeModification mod;
+    private final Exception delayedFailure;
     private final boolean coordinated;
 
     public CommitLocalTransactionRequest(@Nonnull final TransactionIdentifier identifier, final long sequence,
-            @Nonnull final ActorRef replyTo, @Nonnull final DataTreeModification mod, final boolean coordinated) {
+            @Nonnull final ActorRef replyTo, @Nonnull final DataTreeModification mod,
+            @Nullable final Exception delayedFailure, final boolean coordinated) {
         super(identifier, sequence, replyTo);
         this.mod = Preconditions.checkNotNull(mod);
+        this.delayedFailure = delayedFailure;
         this.coordinated = coordinated;
     }
 
+    /**
+     * Return the delayed error detected on the frontend. If this error is present, it will be reported as the result
+     * of the first step of the commit process.
+     *
+     * @return Delayed failure, if present.
+     */
+    public Optional<Exception> getDelayedFailure() {
+        return Optional.ofNullable(delayedFailure);
+    }
+
     public DataTreeModification getModification() {
         return mod;
     }
@@ -58,6 +73,7 @@ public final class CommitLocalTransactionRequest
 
     @Override
     protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
-        return super.addToStringAttributes(toStringHelper).add("coordinated", coordinated);
+        return super.addToStringAttributes(toStringHelper).add("coordinated", coordinated)
+                .add("delayedError", delayedFailure);
     }
 }
index d8dc842..61b7dc2 100644 (file)
@@ -31,7 +31,7 @@ public class CommitLocalTransactionRequestTest
     private static final boolean COORDINATED = true;
 
     private static final CommitLocalTransactionRequest OBJECT = new CommitLocalTransactionRequest(
-            TRANSACTION, 0, ACTOR_REF, MODIFICATION, COORDINATED);
+            TRANSACTION, 0, ACTOR_REF, MODIFICATION, null, COORDINATED);
 
     @Override
     protected CommitLocalTransactionRequest object() {
index e2e93de..a41bef9 100644 (file)
@@ -103,7 +103,13 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             return;
         }
 
-        mod.delete(path);
+        try {
+            mod.delete(path);
+        } catch (Exception e) {
+            LOG.debug("Transaction {} delete on {} incurred failure, delaying it until commit", getIdentifier(), path,
+                e);
+            recordedFailure = e;
+        }
     }
 
     @Override
@@ -115,7 +121,13 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             return;
         }
 
-        mod.merge(path, data);
+        try {
+            mod.merge(path, data);
+        } catch (Exception e) {
+            LOG.debug("Transaction {} merge to {} incurred failure, delaying it until commit", getIdentifier(), path,
+                e);
+            recordedFailure = e;
+        }
     }
 
     @Override
@@ -127,7 +139,13 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             return;
         }
 
-        mod.write(path, data);
+        try {
+            mod.write(path, data);
+        } catch (Exception e) {
+            LOG.debug("Transaction {} write to {} incurred failure, delaying it until commit", getIdentifier(), path,
+                e);
+            recordedFailure = e;
+        }
     }
 
     private RuntimeException abortedException() {
@@ -142,7 +160,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
     CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
         final CursorAwareDataTreeModification mod = getModification();
         final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(getIdentifier(), nextSequence(),
-            localActor(), mod, coordinated);
+            localActor(), mod, recordedFailure, coordinated);
         closedException = this::submittedException;
         return ret;
     }
@@ -185,11 +203,11 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             final @Nullable Consumer<Response<?, ?>> callback) {
         for (final TransactionModification mod : request.getModifications()) {
             if (mod instanceof TransactionWrite) {
-                getModification().write(mod.getPath(), ((TransactionWrite)mod).getData());
+                write(mod.getPath(), ((TransactionWrite)mod).getData());
             } else if (mod instanceof TransactionMerge) {
-                getModification().merge(mod.getPath(), ((TransactionMerge)mod).getData());
+                merge(mod.getPath(), ((TransactionMerge)mod).getData());
             } else if (mod instanceof TransactionDelete) {
-                getModification().delete(mod.getPath());
+                delete(mod.getPath());
             } else {
                 throw new IllegalArgumentException("Unsupported modification " + mod);
             }
index aef83ae..6ca5888 100644 (file)
@@ -192,6 +192,9 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
     abstract FrontendTransaction createReadyTransaction(TransactionIdentifier id, DataTreeModification mod)
         throws RequestException;
 
+    abstract ShardDataTreeCohort createFailedCohort(TransactionIdentifier id, DataTreeModification mod,
+            Exception failure);
+
     abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);
 
     @Override
index f9d9580..0990953 100644 (file)
@@ -215,7 +215,6 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
     private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
         readyCohort.commit(new FutureCallback<UnsignedLong>() {
-
             @Override
             public void onSuccess(final UnsignedLong result) {
                 successfulCommit(envelope, startTime);
@@ -237,16 +236,22 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
     private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
-        if (sealedModification.equals(request.getModification())) {
+        if (!sealedModification.equals(request.getModification())) {
+            LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification());
+            throw new UnsupportedRequestException(request);
+        }
+
+        final java.util.Optional<Exception> optFailure = request.getDelayedFailure();
+        if (optFailure.isPresent()) {
+            readyCohort = history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get());
+        } else {
             readyCohort = history().createReadyCohort(getIdentifier(), sealedModification);
+        }
 
-            if (request.isCoordinated()) {
-                coordinatedCommit(envelope, now);
-            } else {
-                directCommit(envelope, now);
-            }
+        if (request.isCoordinated()) {
+            coordinatedCommit(envelope, now);
         } else {
-            throw new UnsupportedRequestException(request);
+            directCommit(envelope, now);
         }
     }
 
index 94c0965..20d18e1 100644 (file)
@@ -69,6 +69,12 @@ final class LocalFrontendHistory extends AbstractFrontendHistory {
         return FrontendReadWriteTransaction.createReady(this, id, mod);
     }
 
+    @Override
+    ShardDataTreeCohort createFailedCohort(final TransactionIdentifier id, final DataTreeModification mod,
+            final Exception failure) {
+        return chain.createFailedCohort(id, mod, failure);
+    }
+
     @Override
     ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) {
         return chain.createReadyCohort(id, mod);
index 36a6a70..9abb002 100644 (file)
@@ -732,6 +732,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
             Exception cause;
             try {
+                cohort.throwCanCommitFailure();
+
                 tip.validate(modification);
                 LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
                 cohort.successfulCanCommit();
@@ -946,10 +948,19 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         cohortRegistry.process(sender, message);
     }
 
+
+    @Override
+    ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+            final Exception failure) {
+        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.DeadOnArrival(this, mod, txId, failure);
+        pendingTransactions.add(new CommitEntry(cohort, ticker().read()));
+        return cohort;
+    }
+
     @Override
     ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
-            final DataTreeModification modification) {
-        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
+            final DataTreeModification mod) {
+        SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.Normal(this, mod, txId,
                 cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
         pendingTransactions.add(new CommitEntry(cohort, ticker().read()));
         return cohort;
index 794c6d8..f2e9af3 100644 (file)
@@ -119,7 +119,13 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
     }
 
     @Override
-    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification modification) {
-        return dataTree.createReadyCohort(txId, modification);
+    ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+            final Exception failure) {
+        return dataTree.createFailedCohort(txId, mod, failure);
+    }
+
+    @Override
+    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+        return dataTree.createReadyCohort(txId, mod);
     }
 }
index a280a22..23fa0c2 100644 (file)
@@ -18,5 +18,8 @@ abstract class ShardDataTreeTransactionParent {
 
     abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
 
-    abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);
+    abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier txId, DataTreeModification mod);
+
+    abstract ShardDataTreeCohort createFailedCohort(TransactionIdentifier txId, DataTreeModification mod,
+            Exception failure);
 }
index 8d947e8..2a8fdbe 100644 (file)
@@ -24,7 +24,34 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 
-final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
+abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
+    static final class DeadOnArrival extends SimpleShardDataTreeCohort {
+        private final Exception failure;
+
+        DeadOnArrival(final ShardDataTree dataTree, final DataTreeModification transaction,
+            final TransactionIdentifier transactionId, final Exception failure) {
+            super(dataTree, transaction, transactionId, null);
+            this.failure = Preconditions.checkNotNull(failure);
+        }
+
+        @Override
+        void throwCanCommitFailure() throws Exception {
+            throw failure;
+        }
+    }
+
+    static final class Normal extends SimpleShardDataTreeCohort {
+        Normal(final ShardDataTree dataTree, final DataTreeModification transaction,
+            final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) {
+            super(dataTree, transaction, transactionId, Preconditions.checkNotNull(userCohorts));
+        }
+
+        @Override
+        void throwCanCommitFailure() {
+            // No-op
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
 
     private final DataTreeModification transaction;
@@ -42,7 +69,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         this.dataTree = Preconditions.checkNotNull(dataTree);
         this.transaction = Preconditions.checkNotNull(transaction);
         this.transactionId = Preconditions.checkNotNull(transactionId);
-        this.userCohorts = Preconditions.checkNotNull(userCohorts);
+        this.userCohorts = userCohorts;
     }
 
     @Override
@@ -145,7 +172,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         return ret;
     }
 
-    void setNewCandidate(DataTreeCandidateTip dataTreeCandidate) {
+    void setNewCandidate(final DataTreeCandidateTip dataTreeCandidate) {
         checkState(State.PRE_COMMIT_COMPLETE);
         this.candidate = Verify.verifyNotNull(dataTreeCandidate);
     }
@@ -221,6 +248,13 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         this.nextFailure = Preconditions.checkNotNull(cause);
     }
 
+    /**
+     * If there is an initial failure, throw it so the caller can process it.
+     *
+     * @throws Exception reported failure.
+     */
+    abstract void throwCanCommitFailure() throws Exception;
+
     @Override
     public boolean isFailed() {
         return state == State.FAILED || nextFailure != null;
index f08ff4a..7fd53c7 100644 (file)
@@ -72,6 +72,12 @@ final class StandaloneFrontendHistory extends AbstractFrontendHistory {
         return FrontendReadWriteTransaction.createReady(this, id, mod);
     }
 
+    @Override
+    ShardDataTreeCohort createFailedCohort(final TransactionIdentifier id, final DataTreeModification mod,
+            final Exception failure) {
+        return tree.createFailedCohort(id, mod, failure);
+    }
+
     @Override
     ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) {
         return tree.createReadyCohort(id, mod);
index 89a4bda..6f66b3e 100644 (file)
@@ -112,7 +112,7 @@ public abstract class LocalProxyTransactionTest<T extends LocalProxyTransaction>
         final TestProbe probe = createProbe();
         final CursorAwareDataTreeModification modification = mock(CursorAwareDataTreeModification.class);
         final CommitLocalTransactionRequest request =
-                new CommitLocalTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), modification, true);
+                new CommitLocalTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), modification, null, true);
         doAnswer(this::applyToCursorAnswer).when(modification).applyToCursor(any());
         final ModifyTransactionRequest modifyRequest = testForwardToRemote(request, ModifyTransactionRequest.class);
         verify(modification).applyToCursor(any());
index 4b07f3a..9cec79c 100644 (file)
@@ -209,7 +209,7 @@ public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTes
         final TestProbe probe = createProbe();
         final DataTreeModification mod = mock(DataTreeModification.class);
         final TransactionRequest<?> request =
-                new CommitLocalTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), mod, false);
+                new CommitLocalTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), mod, null, false);
         testForwardToLocal(request, CommitLocalTransactionRequest.class);
     }
 
index 20dd17d..31e36cf 100644 (file)
@@ -45,7 +45,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.junit.After;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -1064,9 +1063,6 @@ public class DistributedDataStoreIntegrationTest {
 
     @Test
     public void testChainedTransactionFailureWithSingleShard() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
-
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
                 try (final AbstractDataStore dataStore = setupAbstractDataStore(
@@ -1110,9 +1106,6 @@ public class DistributedDataStoreIntegrationTest {
 
     @Test
     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
-
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
                 try (final AbstractDataStore dataStore = setupAbstractDataStore(
index 5fef104..5b21871 100644 (file)
@@ -62,7 +62,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest {
         doNothing().when(mockUserCohorts).commit();
         doReturn(Optional.empty()).when(mockUserCohorts).abort();
 
-        cohort = new SimpleShardDataTreeCohort(mockShardDataTree, mockModification, nextTransactionId(),
+        cohort = new SimpleShardDataTreeCohort.Normal(mockShardDataTree, mockModification, nextTransactionId(),
             mockUserCohorts);
     }
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.