Bug 3195: Cleanup on error paths and error handling 11/23911/2
authorTom Pantelis <tpanteli@brocade.com>
Mon, 15 Jun 2015 23:38:06 +0000 (19:38 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 8 Jul 2015 16:33:39 +0000 (16:33 +0000)
Modified ShardCommitCoordinator#handleBatchedModifications to remove the
cohortEntry from the cache if the total messages sent doesn't match the
total received.

With recent yangtools changes, write and merge on a DOM tx can throw
unchecked exceptions if he data is invalid. Modified the front-end
local tx path to catch unchecked exceptions in LocalTransactionContext
and propagate to the LocalThreePhaseCommit to immediately fail the
ready.

Similarly modified ShardCommitCoordinator#handleBatchedModifications to
handle unchecked exceptions from applied operations and propagate when
the tx is readied.

Added unit tests to cover these cases.

Also, modified LocalTransactionContext#readyTransaction to handle
unchecked exceptions from the DOM tx ready.

Change-Id: Ib6fe6e04b8626bf996cfabfe74da780f05ce838a
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit 3cd149e98ed5260b46006ff474bebe96f198756f)

12 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionReadySupport.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 1e085523fdc0599e21a320e4d721abdba9bd13a1..5f9cc4a0d21768ee922b99dd70d0111b7f9508bf 100644 (file)
@@ -181,7 +181,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
         switch(parent.getType()) {
             case READ_ONLY:
                 final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
-                return new LocalTransactionContext(readOnly, parent.getIdentifier()) {
+                return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
                     @Override
                     protected DOMStoreWriteTransaction getWriteDelegate() {
                         throw new UnsupportedOperationException();
@@ -194,7 +194,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
                 };
             case READ_WRITE:
                 final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
-                return new LocalTransactionContext(readWrite, parent.getIdentifier()) {
+                return new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
                     @Override
                     protected DOMStoreWriteTransaction getWriteDelegate() {
                         return readWrite;
@@ -207,7 +207,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
                 };
             case WRITE_ONLY:
                 final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
-                return new LocalTransactionContext(writeOnly, parent.getIdentifier()) {
+                return new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
                     @Override
                     protected DOMStoreWriteTransaction getWriteDelegate() {
                         return writeOnly;
index 4e085399d2093d2bc84661e1d6f8679fd73f59a1..569bacceaa8f600f4110ddafb106f34dbccac881 100644 (file)
@@ -8,10 +8,10 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
-import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
@@ -30,13 +30,14 @@ import scala.concurrent.Future;
  * It is not actually called by the front-end to perform 3PC thus the canCommit/preCommit/commit methods
  * are no-ops.
  */
-abstract class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
     private static final Logger LOG = LoggerFactory.getLogger(LocalThreePhaseCommitCohort.class);
 
     private final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction;
     private final DataTreeModification modification;
     private final ActorContext actorContext;
     private final ActorSelection leader;
+    private Exception operationError;
 
     protected LocalThreePhaseCommitCohort(final ActorContext actorContext, final ActorSelection leader,
             final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction, final DataTreeModification modification) {
@@ -46,19 +47,27 @@ abstract class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCo
         this.modification = Preconditions.checkNotNull(modification);
     }
 
+    protected LocalThreePhaseCommitCohort(final ActorContext actorContext, final ActorSelection leader,
+            final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction, final Exception operationError) {
+        this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.leader = Preconditions.checkNotNull(leader);
+        this.transaction = Preconditions.checkNotNull(transaction);
+        this.operationError = Preconditions.checkNotNull(operationError);
+        this.modification = null;
+    }
+
     private Future<Object> initiateCommit(final boolean immediate) {
+        if(operationError != null) {
+            return Futures.failed(operationError);
+        }
+
         final ReadyLocalTransaction message = new ReadyLocalTransaction(transaction.getIdentifier().toString(),
                 modification, immediate);
         return actorContext.executeOperationAsync(leader, message, actorContext.getTransactionCommitOperationTimeout());
     }
 
-    /**
-     * Return the {@link ActorContext} associated with this object.
-     *
-     * @return An actor context instance.
-     */
-    @Nonnull ActorContext getActorContext() {
-        return actorContext;
+    void setOperationError(Exception operationError) {
+        this.operationError = operationError;
     }
 
     Future<ActorSelection> initiateCoordinatedCommit() {
@@ -126,6 +135,9 @@ abstract class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCo
         throw new UnsupportedOperationException();
     }
 
-    protected abstract void transactionAborted(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction);
-    protected abstract void transactionCommitted(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction);
+    protected void transactionAborted(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
+    }
+
+    protected void transactionCommitted(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
+    }
 }
index 39d0133d02f5f71eab1d509079f7fd0c688ba903..56466f78408fe24602564f9c62e14a8f5e31add5 100644 (file)
@@ -58,17 +58,7 @@ final class LocalTransactionChain extends AbstractSnapshotBackedTransactionChain
 
     @Override
     protected DOMStoreThreePhaseCommitCohort createCohort(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction, final DataTreeModification modification) {
-        return new LocalThreePhaseCommitCohort(parent.getActorContext(), leader, transaction, modification) {
-            @Override
-            protected void transactionAborted(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
-                onTransactionFailed(transaction, ABORTED);
-            }
-
-            @Override
-            protected void transactionCommitted(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
-                onTransactionCommited(transaction);
-            }
-        };
+        return new LocalChainThreePhaseCommitCohort(transaction, modification);
     }
 
     @Override
@@ -85,4 +75,39 @@ final class LocalTransactionChain extends AbstractSnapshotBackedTransactionChain
     public DOMStoreWriteTransaction newWriteOnlyTransaction(TransactionIdentifier identifier) {
         return super.newWriteOnlyTransaction(identifier);
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public LocalThreePhaseCommitCohort onTransactionReady(DOMStoreWriteTransaction tx) {
+        try {
+            return (LocalThreePhaseCommitCohort) tx.ready();
+        } catch (Exception e) {
+            // Unfortunately we need to cast to SnapshotBackedWriteTransaction here as it's required by
+            // LocalThreePhaseCommitCohort and the base class.
+            return new LocalChainThreePhaseCommitCohort((SnapshotBackedWriteTransaction<TransactionIdentifier>)tx, e);
+        }
+    }
+
+    private class LocalChainThreePhaseCommitCohort extends LocalThreePhaseCommitCohort {
+
+        protected LocalChainThreePhaseCommitCohort(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction,
+                DataTreeModification modification) {
+            super(parent.getActorContext(), leader, transaction, modification);
+        }
+
+        protected LocalChainThreePhaseCommitCohort(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction,
+                Exception operationError) {
+            super(parent.getActorContext(), leader, transaction, operationError);
+        }
+
+        @Override
+        protected void transactionAborted(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
+            onTransactionFailed(transaction, ABORTED);
+        }
+
+        @Override
+        protected void transactionCommitted(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
+            onTransactionCommited(transaction);
+        }
+    }
 }
index e62d15b7ffc2bb05dca59e0fdae16739e0e778e3..276523e680a6389210edf290fff1a32fff252f23 100644 (file)
@@ -29,10 +29,14 @@ import scala.concurrent.Future;
  */
 abstract class LocalTransactionContext extends AbstractTransactionContext {
     private final DOMStoreTransaction txDelegate;
+    private final LocalTransactionReadySupport readySupport;
+    private Exception operationError;
 
-    LocalTransactionContext(DOMStoreTransaction txDelegate, TransactionIdentifier identifier) {
+    LocalTransactionContext(DOMStoreTransaction txDelegate, TransactionIdentifier identifier,
+            LocalTransactionReadySupport readySupport) {
         super(identifier);
         this.txDelegate = Preconditions.checkNotNull(txDelegate);
+        this.readySupport = readySupport;
     }
 
     protected abstract DOMStoreWriteTransaction getWriteDelegate();
@@ -42,19 +46,38 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     @Override
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         incrementModificationCount();
-        getWriteDelegate().write(path, data);
+        if(operationError == null) {
+            try {
+                getWriteDelegate().write(path, data);
+            } catch (Exception e) {
+                operationError = e;
+            }
+        }
+
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         incrementModificationCount();
-        getWriteDelegate().merge(path, data);
+        if(operationError == null) {
+            try {
+                getWriteDelegate().merge(path, data);
+            } catch (Exception e) {
+                operationError = e;
+            }
+        }
     }
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
         incrementModificationCount();
-        getWriteDelegate().delete(path);
+        if(operationError == null) {
+            try {
+                getWriteDelegate().delete(path);
+            } catch (Exception e) {
+                operationError = e;
+            }
+        }
     }
 
     @Override
@@ -89,7 +112,9 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
 
     private LocalThreePhaseCommitCohort ready() {
         logModificationCount();
-        return (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
+        LocalThreePhaseCommitCohort cohort = readySupport.onTransactionReady(getWriteDelegate());
+        cohort.setOperationError(operationError);
+        return cohort;
     }
 
     @Override
index d574e83401d31ffed21891b0852489e562fcd727..ff0ef76cec34084097442010c6702c93b9c7b582 100644 (file)
@@ -18,7 +18,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
  *
  * @author Thomas Pantelis
  */
-interface LocalTransactionFactory {
+interface LocalTransactionFactory extends LocalTransactionReadySupport {
     DOMStoreReadTransaction newReadOnlyTransaction(TransactionIdentifier identifier);
 
     DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier);
index 149b9370ecba75bd264e2f50c8d5cdfd89a4b39a..2f4474b5d7350071b90244e2297f3917d7fbe180 100644 (file)
@@ -20,8 +20,6 @@ import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransact
 import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * {@link LocalTransactionFactory} for instantiating backing transactions which are
@@ -31,7 +29,6 @@ import org.slf4j.LoggerFactory;
 final class LocalTransactionFactoryImpl extends TransactionReadyPrototype<TransactionIdentifier>
         implements LocalTransactionFactory {
 
-    private static final Logger LOG = LoggerFactory.getLogger(LocalTransactionFactoryImpl.class);
     private final ActorSelection leader;
     private final DataTree dataTree;
     private final ActorContext actorContext;
@@ -69,18 +66,19 @@ final class LocalTransactionFactoryImpl extends TransactionReadyPrototype<Transa
     @Override
     protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction<TransactionIdentifier> tx,
             final DataTreeModification tree) {
-        return new LocalThreePhaseCommitCohort(actorContext, leader, tx, tree) {
-            @Override
-            protected void transactionAborted(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
-                // No-op
-                LOG.debug("Transaction {} aborted", transaction);
-            }
+        return new LocalThreePhaseCommitCohort(actorContext, leader, tx, tree);
+    }
 
-            @Override
-            protected void transactionCommitted(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
-                // No-op
-                LOG.debug("Transaction {} committed", transaction);
-            }
-        };
+    @SuppressWarnings("unchecked")
+    @Override
+    public LocalThreePhaseCommitCohort onTransactionReady(DOMStoreWriteTransaction tx) {
+        try {
+            return (LocalThreePhaseCommitCohort) tx.ready();
+        } catch (Exception e) {
+            // Unfortunately we need to cast to SnapshotBackedWriteTransaction here as it's required by
+            // LocalThreePhaseCommitCohort.
+            return new LocalThreePhaseCommitCohort(actorContext, leader,
+                    (SnapshotBackedWriteTransaction<TransactionIdentifier>)tx, e);
+        }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionReadySupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionReadySupport.java
new file mode 100644 (file)
index 0000000..e03f3d2
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+
+/**
+ * Interface for a class that can "ready" a transaction.
+ *
+ * @author Thomas Pantelis
+ */
+interface LocalTransactionReadySupport {
+    LocalThreePhaseCommitCohort onTransactionReady(DOMStoreWriteTransaction tx);
+}
index 7d806890448ff783be319bcb66c3086f3d8f2e00..53f27061ae527404730d97224ae15651ea803b97 100644 (file)
@@ -184,7 +184,13 @@ class ShardCommitCoordinator {
         cohortEntry.applyModifications(batched.getModifications());
 
         if(batched.isReady()) {
+            if(cohortEntry.getLastBatchedModificationsException() != null) {
+                cohortCache.remove(cohortEntry.getTransactionID());
+                throw cohortEntry.getLastBatchedModificationsException();
+            }
+
             if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
+                cohortCache.remove(cohortEntry.getTransactionID());
                 throw new IllegalStateException(String.format(
                         "The total number of batched messages received %d does not match the number sent %d",
                         cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
@@ -495,6 +501,7 @@ class ShardCommitCoordinator {
         private final String transactionID;
         private ShardDataTreeCohort cohort;
         private final ReadWriteShardDataTreeTransaction transaction;
+        private RuntimeException lastBatchedModificationsException;
         private ActorRef replySender;
         private Shard shard;
         private boolean doImmediateCommit;
@@ -536,12 +543,22 @@ class ShardCommitCoordinator {
             return totalBatchedModificationsReceived;
         }
 
-        void applyModifications(Iterable<Modification> modifications) {
-            for (Modification modification : modifications) {
-                modification.apply(transaction.getSnapshot());
-            }
+        RuntimeException getLastBatchedModificationsException() {
+            return lastBatchedModificationsException;
+        }
 
+        void applyModifications(Iterable<Modification> modifications) {
             totalBatchedModificationsReceived++;
+            if(lastBatchedModificationsException == null) {
+                for (Modification modification : modifications) {
+                        try {
+                            modification.apply(transaction.getSnapshot());
+                        } catch (RuntimeException e) {
+                            lastBatchedModificationsException = e;
+                            throw e;
+                        }
+                }
+            }
         }
 
         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
index 5f21ee1caebc2cdf090349289b401bb8fa303905..2319c5be384326a61ef8fb0d5c7519d2e812c498 100644 (file)
@@ -2,6 +2,11 @@ package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.AddressFromURIString;
@@ -51,10 +56,12 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
 public class DistributedDataStoreIntegrationTest {
 
@@ -963,6 +970,82 @@ public class DistributedDataStoreIntegrationTest {
         }};
     }
 
+    @Test
+    public void testChainedTransactionFailureWithSingleShard() throws Exception{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
+            DistributedDataStore dataStore = setupDistributedDataStore(
+                    "testChainedTransactionFailureWithSingleShard", "cars-1");
+
+            ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
+                    ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
+                            LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
+
+            TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
+            DOMTransactionChain txChain = broker.createTransactionChain(listener);
+
+            DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
+
+            ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                    new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
+                        withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
+
+            rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
+
+            try {
+                rwTx.submit().checkedGet(5, TimeUnit.SECONDS);
+                fail("Expected TransactionCommitFailedException");
+            } catch (TransactionCommitFailedException e) {
+                // Expected
+            }
+
+            verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class));
+
+            txChain.close();
+            broker.close();
+            cleanup(dataStore);
+        }};
+    }
+
+    @Test
+    public void testChainedTransactionFailureWithMultipleShards() throws Exception{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
+            DistributedDataStore dataStore = setupDistributedDataStore(
+                    "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1");
+
+            ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
+                    ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
+                            LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
+
+            TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
+            DOMTransactionChain txChain = broker.createTransactionChain(listener);
+
+            DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+
+            writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+
+            ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                    new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
+                        withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
+
+            // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
+            // done for put for performance reasons.
+            writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
+
+            try {
+                writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
+                fail("Expected TransactionCommitFailedException");
+            } catch (TransactionCommitFailedException e) {
+                // Expected
+            }
+
+            verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
+
+            txChain.close();
+            broker.close();
+            cleanup(dataStore);
+        }};
+    }
+
     @Test
     public void testChangeListenerRegistration() throws Exception{
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
index 53b904ea36f103a2fad0d3af9f80e7a00d0ab57a..672d8f3b9997cc866d8320c11da3a7d55aa61e49 100644 (file)
@@ -9,6 +9,11 @@ package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
@@ -16,12 +21,15 @@ import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.ConfigFactory;
 import java.math.BigInteger;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
@@ -31,12 +39,20 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -44,6 +60,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 
 /**
@@ -357,6 +374,76 @@ public class DistributedDataStoreRemotingIntegrationTest {
         assertEquals("isPresent", false, optional.isPresent());
     }
 
+    @Test
+    public void testChainedTransactionFailureWithSingleShard() throws Exception {
+        initDatastores("testChainedTransactionFailureWithSingleShard");
+
+        ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
+                ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
+                        LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
+                        MoreExecutors.directExecutor());
+
+        TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
+        DOMTransactionChain txChain = broker.createTransactionChain(listener);
+
+        DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+
+        ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
+                    withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
+
+        writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
+
+        try {
+            writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
+            fail("Expected TransactionCommitFailedException");
+        } catch (TransactionCommitFailedException e) {
+            // Expected
+        }
+
+        verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
+
+        txChain.close();
+        broker.close();
+    }
+
+    @Test
+    public void testChainedTransactionFailureWithMultipleShards() throws Exception {
+        initDatastores("testChainedTransactionFailureWithMultipleShards");
+
+        ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
+                ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
+                        LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
+                        MoreExecutors.directExecutor());
+
+        TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
+        DOMTransactionChain txChain = broker.createTransactionChain(listener);
+
+        DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+
+        writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+
+        ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
+                    withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
+
+        // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
+        // done for put for performance reasons.
+        writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
+
+        try {
+            writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
+            fail("Expected TransactionCommitFailedException");
+        } catch (TransactionCommitFailedException e) {
+            // Expected
+        }
+
+        verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
+
+        txChain.close();
+        broker.close();
+    }
+
     @Test
     public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
         String testName = "testSingleShardTransactionsWithLeaderChanges";
index 0545bbae360ea0be22f1fc19fe7906b4e7a3220a..e2d6127708171cfdb05ed562f04e7329e7b8834c 100644 (file)
@@ -1,24 +1,25 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import akka.dispatch.ExecutionContexts;
+import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import scala.concurrent.Future;
 
 public class LocalTransactionContextTest {
 
@@ -31,12 +32,15 @@ public class LocalTransactionContextTest {
     @Mock
     DOMStoreReadWriteTransaction readWriteTransaction;
 
+    @Mock
+    LocalTransactionReadySupport mockReadySupport;
+
     LocalTransactionContext localTransactionContext;
 
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter.getIdentifier()) {
+        localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter.getIdentifier(), mockReadySupport) {
             @Override
             protected DOMStoreWriteTransaction getWriteDelegate() {
                 return readWriteTransaction;
@@ -93,14 +97,66 @@ public class LocalTransactionContextTest {
     @Test
     public void testReady() {
         final LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
-        final ActorContext mockContext = mock(ActorContext.class);
-        doReturn(mockContext).when(mockCohort).getActorContext();
-        doReturn(ExecutionContexts.fromExecutor(MoreExecutors.directExecutor())).when(mockContext).getClientDispatcher();
         doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
-        doReturn(mockCohort).when(readWriteTransaction).ready();
-        localTransactionContext.readyTransaction();
-        verify(readWriteTransaction).ready();
+        doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction);
+
+        Future<ActorSelection> future = localTransactionContext.readyTransaction();
+        assertTrue(future.isCompleted());
+
+        verify(mockReadySupport).onTransactionReady(readWriteTransaction);
+    }
+
+    @Test
+    public void testReadyWithWriteError() {
+        YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
+        NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
+        RuntimeException error = new RuntimeException("mock");
+        doThrow(error).when(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
+
+        localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
+        localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
+
+        verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
+
+        doReadyWithExpectedError(error);
     }
 
+    @Test
+    public void testReadyWithMergeError() {
+        YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
+        NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
+        RuntimeException error = new RuntimeException("mock");
+        doThrow(error).when(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
 
+        localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
+        localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
+
+        verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
+
+        doReadyWithExpectedError(error);
+    }
+
+    @Test
+    public void testReadyWithDeleteError() {
+        YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
+        RuntimeException error = new RuntimeException("mock");
+        doThrow(error).when(readWriteTransaction).delete(yangInstanceIdentifier);
+
+        localTransactionContext.deleteData(yangInstanceIdentifier);
+        localTransactionContext.deleteData(yangInstanceIdentifier);
+
+        verify(readWriteTransaction).delete(yangInstanceIdentifier);
+
+        doReadyWithExpectedError(error);
+    }
+
+    private void doReadyWithExpectedError(RuntimeException expError) {
+        LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
+        doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
+        doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction);
+
+        localTransactionContext.readyTransaction();
+
+        verify(mockCohort).setOperationError(expError);
+    }
 }
index bd4d8ec58855619704be9ce5dc595fe7896f26cf..83b15b99df43f74e0a8a5d61d7493b1792c17e8b 100644 (file)
@@ -11,7 +11,6 @@ import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
-
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
@@ -988,6 +987,44 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
+    @Test
+    public void testBatchedModificationsWithOperationFailure() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testBatchedModificationsWithOperationFailure");
+
+            waitUntilLeader(shard);
+
+            // Test merge with invalid data. An exception should occur when the merge is applied. Note that
+            // write will not validate the children for performance reasons.
+
+            String transactionID = "tx1";
+
+            ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                    new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                        withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
+
+            BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
+            batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
+            shard.tell(batched, getRef());
+            Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+            Throwable cause = failure.cause();
+
+            batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
+            batched.setReady(true);
+            batched.setTotalMessagesSent(2);
+
+            shard.tell(batched, getRef());
+
+            failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+            assertEquals("Failure cause", cause, failure.cause());
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
     @SuppressWarnings("unchecked")
     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
         final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);