From: Tom Pantelis Date: Mon, 15 Jun 2015 23:38:06 +0000 (-0400) Subject: Bug 3195: Cleanup on error paths and error handling X-Git-Tag: release/beryllium~424 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=340a2d4c979ac6f8d5adff8bd9e1c9f724e7a164 Bug 3195: Cleanup on error paths and error handling Modified ShardCommitCoordinator#handleBatchedModifications to remove the cohortEntry from the cache if the total messages sent doesn't match the total received. With recent yangtools changes, write and merge on a DOM tx can throw unchecked exceptions if he data is invalid. Modified the front-end local tx path to catch unchecked exceptions in LocalTransactionContext and propagate to the LocalThreePhaseCommit to immediately fail the ready. Similarly modified ShardCommitCoordinator#handleBatchedModifications to handle unchecked exceptions from applied operations and propagate when the tx is readied. Added unit tests to cover these cases. Also, modified LocalTransactionContext#readyTransaction to handle unchecked exceptions from the DOM tx ready. Change-Id: Ib6fe6e04b8626bf996cfabfe74da780f05ce838a Signed-off-by: Tom Pantelis (cherry picked from commit 3cd149e98ed5260b46006ff474bebe96f198756f) --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java index 1e085523fd..5f9cc4a0d2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java @@ -181,7 +181,7 @@ abstract class AbstractTransactionContextFactory transaction; private final DataTreeModification modification; private final ActorContext actorContext; private final ActorSelection leader; + private Exception operationError; protected LocalThreePhaseCommitCohort(final ActorContext actorContext, final ActorSelection leader, final SnapshotBackedWriteTransaction transaction, final DataTreeModification modification) { @@ -46,19 +47,27 @@ abstract class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCo this.modification = Preconditions.checkNotNull(modification); } + protected LocalThreePhaseCommitCohort(final ActorContext actorContext, final ActorSelection leader, + final SnapshotBackedWriteTransaction transaction, final Exception operationError) { + this.actorContext = Preconditions.checkNotNull(actorContext); + this.leader = Preconditions.checkNotNull(leader); + this.transaction = Preconditions.checkNotNull(transaction); + this.operationError = Preconditions.checkNotNull(operationError); + this.modification = null; + } + private Future initiateCommit(final boolean immediate) { + if(operationError != null) { + return Futures.failed(operationError); + } + final ReadyLocalTransaction message = new ReadyLocalTransaction(transaction.getIdentifier().toString(), modification, immediate); return actorContext.executeOperationAsync(leader, message, actorContext.getTransactionCommitOperationTimeout()); } - /** - * Return the {@link ActorContext} associated with this object. - * - * @return An actor context instance. - */ - @Nonnull ActorContext getActorContext() { - return actorContext; + void setOperationError(Exception operationError) { + this.operationError = operationError; } Future initiateCoordinatedCommit() { @@ -126,6 +135,9 @@ abstract class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCo throw new UnsupportedOperationException(); } - protected abstract void transactionAborted(SnapshotBackedWriteTransaction transaction); - protected abstract void transactionCommitted(SnapshotBackedWriteTransaction transaction); + protected void transactionAborted(SnapshotBackedWriteTransaction transaction) { + } + + protected void transactionCommitted(SnapshotBackedWriteTransaction transaction) { + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java index 39d0133d02..56466f7840 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java @@ -58,17 +58,7 @@ final class LocalTransactionChain extends AbstractSnapshotBackedTransactionChain @Override protected DOMStoreThreePhaseCommitCohort createCohort(final SnapshotBackedWriteTransaction transaction, final DataTreeModification modification) { - return new LocalThreePhaseCommitCohort(parent.getActorContext(), leader, transaction, modification) { - @Override - protected void transactionAborted(final SnapshotBackedWriteTransaction transaction) { - onTransactionFailed(transaction, ABORTED); - } - - @Override - protected void transactionCommitted(final SnapshotBackedWriteTransaction transaction) { - onTransactionCommited(transaction); - } - }; + return new LocalChainThreePhaseCommitCohort(transaction, modification); } @Override @@ -85,4 +75,39 @@ final class LocalTransactionChain extends AbstractSnapshotBackedTransactionChain public DOMStoreWriteTransaction newWriteOnlyTransaction(TransactionIdentifier identifier) { return super.newWriteOnlyTransaction(identifier); } + + @SuppressWarnings("unchecked") + @Override + public LocalThreePhaseCommitCohort onTransactionReady(DOMStoreWriteTransaction tx) { + try { + return (LocalThreePhaseCommitCohort) tx.ready(); + } catch (Exception e) { + // Unfortunately we need to cast to SnapshotBackedWriteTransaction here as it's required by + // LocalThreePhaseCommitCohort and the base class. + return new LocalChainThreePhaseCommitCohort((SnapshotBackedWriteTransaction)tx, e); + } + } + + private class LocalChainThreePhaseCommitCohort extends LocalThreePhaseCommitCohort { + + protected LocalChainThreePhaseCommitCohort(SnapshotBackedWriteTransaction transaction, + DataTreeModification modification) { + super(parent.getActorContext(), leader, transaction, modification); + } + + protected LocalChainThreePhaseCommitCohort(SnapshotBackedWriteTransaction transaction, + Exception operationError) { + super(parent.getActorContext(), leader, transaction, operationError); + } + + @Override + protected void transactionAborted(SnapshotBackedWriteTransaction transaction) { + onTransactionFailed(transaction, ABORTED); + } + + @Override + protected void transactionCommitted(SnapshotBackedWriteTransaction transaction) { + onTransactionCommited(transaction); + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java index e62d15b7ff..276523e680 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java @@ -29,10 +29,14 @@ import scala.concurrent.Future; */ abstract class LocalTransactionContext extends AbstractTransactionContext { private final DOMStoreTransaction txDelegate; + private final LocalTransactionReadySupport readySupport; + private Exception operationError; - LocalTransactionContext(DOMStoreTransaction txDelegate, TransactionIdentifier identifier) { + LocalTransactionContext(DOMStoreTransaction txDelegate, TransactionIdentifier identifier, + LocalTransactionReadySupport readySupport) { super(identifier); this.txDelegate = Preconditions.checkNotNull(txDelegate); + this.readySupport = readySupport; } protected abstract DOMStoreWriteTransaction getWriteDelegate(); @@ -42,19 +46,38 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { incrementModificationCount(); - getWriteDelegate().write(path, data); + if(operationError == null) { + try { + getWriteDelegate().write(path, data); + } catch (Exception e) { + operationError = e; + } + } + } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { incrementModificationCount(); - getWriteDelegate().merge(path, data); + if(operationError == null) { + try { + getWriteDelegate().merge(path, data); + } catch (Exception e) { + operationError = e; + } + } } @Override public void deleteData(YangInstanceIdentifier path) { incrementModificationCount(); - getWriteDelegate().delete(path); + if(operationError == null) { + try { + getWriteDelegate().delete(path); + } catch (Exception e) { + operationError = e; + } + } } @Override @@ -89,7 +112,9 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { private LocalThreePhaseCommitCohort ready() { logModificationCount(); - return (LocalThreePhaseCommitCohort) getWriteDelegate().ready(); + LocalThreePhaseCommitCohort cohort = readySupport.onTransactionReady(getWriteDelegate()); + cohort.setOperationError(operationError); + return cohort; } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java index d574e83401..ff0ef76cec 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java @@ -18,7 +18,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; * * @author Thomas Pantelis */ -interface LocalTransactionFactory { +interface LocalTransactionFactory extends LocalTransactionReadySupport { DOMStoreReadTransaction newReadOnlyTransaction(TransactionIdentifier identifier); DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java index 149b9370ec..2f4474b5d7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java @@ -20,8 +20,6 @@ import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransact import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * {@link LocalTransactionFactory} for instantiating backing transactions which are @@ -31,7 +29,6 @@ import org.slf4j.LoggerFactory; final class LocalTransactionFactoryImpl extends TransactionReadyPrototype implements LocalTransactionFactory { - private static final Logger LOG = LoggerFactory.getLogger(LocalTransactionFactoryImpl.class); private final ActorSelection leader; private final DataTree dataTree; private final ActorContext actorContext; @@ -69,18 +66,19 @@ final class LocalTransactionFactoryImpl extends TransactionReadyPrototype tx, final DataTreeModification tree) { - return new LocalThreePhaseCommitCohort(actorContext, leader, tx, tree) { - @Override - protected void transactionAborted(final SnapshotBackedWriteTransaction transaction) { - // No-op - LOG.debug("Transaction {} aborted", transaction); - } + return new LocalThreePhaseCommitCohort(actorContext, leader, tx, tree); + } - @Override - protected void transactionCommitted(final SnapshotBackedWriteTransaction transaction) { - // No-op - LOG.debug("Transaction {} committed", transaction); - } - }; + @SuppressWarnings("unchecked") + @Override + public LocalThreePhaseCommitCohort onTransactionReady(DOMStoreWriteTransaction tx) { + try { + return (LocalThreePhaseCommitCohort) tx.ready(); + } catch (Exception e) { + // Unfortunately we need to cast to SnapshotBackedWriteTransaction here as it's required by + // LocalThreePhaseCommitCohort. + return new LocalThreePhaseCommitCohort(actorContext, leader, + (SnapshotBackedWriteTransaction)tx, e); + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionReadySupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionReadySupport.java new file mode 100644 index 0000000000..e03f3d2433 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionReadySupport.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; + +/** + * Interface for a class that can "ready" a transaction. + * + * @author Thomas Pantelis + */ +interface LocalTransactionReadySupport { + LocalThreePhaseCommitCohort onTransactionReady(DOMStoreWriteTransaction tx); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 7d80689044..53f27061ae 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -184,7 +184,13 @@ class ShardCommitCoordinator { cohortEntry.applyModifications(batched.getModifications()); if(batched.isReady()) { + if(cohortEntry.getLastBatchedModificationsException() != null) { + cohortCache.remove(cohortEntry.getTransactionID()); + throw cohortEntry.getLastBatchedModificationsException(); + } + if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) { + cohortCache.remove(cohortEntry.getTransactionID()); throw new IllegalStateException(String.format( "The total number of batched messages received %d does not match the number sent %d", cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent())); @@ -495,6 +501,7 @@ class ShardCommitCoordinator { private final String transactionID; private ShardDataTreeCohort cohort; private final ReadWriteShardDataTreeTransaction transaction; + private RuntimeException lastBatchedModificationsException; private ActorRef replySender; private Shard shard; private boolean doImmediateCommit; @@ -536,12 +543,22 @@ class ShardCommitCoordinator { return totalBatchedModificationsReceived; } - void applyModifications(Iterable modifications) { - for (Modification modification : modifications) { - modification.apply(transaction.getSnapshot()); - } + RuntimeException getLastBatchedModificationsException() { + return lastBatchedModificationsException; + } + void applyModifications(Iterable modifications) { totalBatchedModificationsReceived++; + if(lastBatchedModificationsException == null) { + for (Modification modification : modifications) { + try { + modification.apply(transaction.getSnapshot()); + } catch (RuntimeException e) { + lastBatchedModificationsException = e; + throw e; + } + } + } } void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 5f21ee1cae..2319c5be38 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -2,6 +2,11 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; @@ -51,10 +56,12 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; public class DistributedDataStoreIntegrationTest { @@ -963,6 +970,82 @@ public class DistributedDataStoreIntegrationTest { }}; } + @Test + public void testChainedTransactionFailureWithSingleShard() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testChainedTransactionFailureWithSingleShard", "cars-1"); + + ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ImmutableMap.builder().put( + LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); + + TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + DOMTransactionChain txChain = broker.createTransactionChain(listener); + + DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + + ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); + + rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + + try { + rwTx.submit().checkedGet(5, TimeUnit.SECONDS); + fail("Expected TransactionCommitFailedException"); + } catch (TransactionCommitFailedException e) { + // Expected + } + + verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class)); + + txChain.close(); + broker.close(); + cleanup(dataStore); + }}; + } + + @Test + public void testChainedTransactionFailureWithMultipleShards() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1"); + + ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ImmutableMap.builder().put( + LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); + + TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + DOMTransactionChain txChain = broker.createTransactionChain(listener); + + DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + + writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + + ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); + + // Note that merge will validate the data and fail but put succeeds b/c deep validation is not + // done for put for performance reasons. + writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + + try { + writeTx.submit().checkedGet(5, TimeUnit.SECONDS); + fail("Expected TransactionCommitFailedException"); + } catch (TransactionCommitFailedException e) { + // Expected + } + + verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class)); + + txChain.close(); + broker.close(); + cleanup(dataStore); + }}; + } + @Test public void testChangeListenerRegistration() throws Exception{ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 53b904ea36..672d8f3b99 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -9,6 +9,11 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; @@ -16,12 +21,15 @@ import akka.actor.AddressFromURIString; import akka.cluster.Cluster; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.MoreExecutors; import com.typesafe.config.ConfigFactory; import java.math.BigInteger; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; @@ -31,12 +39,20 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -44,6 +60,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; /** @@ -357,6 +374,76 @@ public class DistributedDataStoreRemotingIntegrationTest { assertEquals("isPresent", false, optional.isPresent()); } + @Test + public void testChainedTransactionFailureWithSingleShard() throws Exception { + initDatastores("testChainedTransactionFailureWithSingleShard"); + + ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ImmutableMap.builder().put( + LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(), + MoreExecutors.directExecutor()); + + TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + DOMTransactionChain txChain = broker.createTransactionChain(listener); + + DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + + ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); + + writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + + try { + writeTx.submit().checkedGet(5, TimeUnit.SECONDS); + fail("Expected TransactionCommitFailedException"); + } catch (TransactionCommitFailedException e) { + // Expected + } + + verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class)); + + txChain.close(); + broker.close(); + } + + @Test + public void testChainedTransactionFailureWithMultipleShards() throws Exception { + initDatastores("testChainedTransactionFailureWithMultipleShards"); + + ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ImmutableMap.builder().put( + LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(), + MoreExecutors.directExecutor()); + + TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + DOMTransactionChain txChain = broker.createTransactionChain(listener); + + DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + + writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + + ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); + + // Note that merge will validate the data and fail but put succeeds b/c deep validation is not + // done for put for performance reasons. + writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + + try { + writeTx.submit().checkedGet(5, TimeUnit.SECONDS); + fail("Expected TransactionCommitFailedException"); + } catch (TransactionCommitFailedException e) { + // Expected + } + + verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class)); + + txChain.close(); + broker.close(); + } + @Test public void testSingleShardTransactionsWithLeaderChanges() throws Exception { String testName = "testSingleShardTransactionsWithLeaderChanges"; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java index 0545bbae36..e2d6127708 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java @@ -1,24 +1,25 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import akka.dispatch.ExecutionContexts; +import akka.actor.ActorSelection; import com.google.common.base.Optional; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import scala.concurrent.Future; public class LocalTransactionContextTest { @@ -31,12 +32,15 @@ public class LocalTransactionContextTest { @Mock DOMStoreReadWriteTransaction readWriteTransaction; + @Mock + LocalTransactionReadySupport mockReadySupport; + LocalTransactionContext localTransactionContext; @Before public void setUp() { MockitoAnnotations.initMocks(this); - localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter.getIdentifier()) { + localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter.getIdentifier(), mockReadySupport) { @Override protected DOMStoreWriteTransaction getWriteDelegate() { return readWriteTransaction; @@ -93,14 +97,66 @@ public class LocalTransactionContextTest { @Test public void testReady() { final LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class); - final ActorContext mockContext = mock(ActorContext.class); - doReturn(mockContext).when(mockCohort).getActorContext(); - doReturn(ExecutionContexts.fromExecutor(MoreExecutors.directExecutor())).when(mockContext).getClientDispatcher(); doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit(); - doReturn(mockCohort).when(readWriteTransaction).ready(); - localTransactionContext.readyTransaction(); - verify(readWriteTransaction).ready(); + doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction); + + Future future = localTransactionContext.readyTransaction(); + assertTrue(future.isCompleted()); + + verify(mockReadySupport).onTransactionReady(readWriteTransaction); + } + + @Test + public void testReadyWithWriteError() { + YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); + NormalizedNode normalizedNode = mock(NormalizedNode.class); + RuntimeException error = new RuntimeException("mock"); + doThrow(error).when(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode); + + localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode); + localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode); + + verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode); + + doReadyWithExpectedError(error); } + @Test + public void testReadyWithMergeError() { + YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); + NormalizedNode normalizedNode = mock(NormalizedNode.class); + RuntimeException error = new RuntimeException("mock"); + doThrow(error).when(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode); + localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode); + localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode); + + verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode); + + doReadyWithExpectedError(error); + } + + @Test + public void testReadyWithDeleteError() { + YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); + RuntimeException error = new RuntimeException("mock"); + doThrow(error).when(readWriteTransaction).delete(yangInstanceIdentifier); + + localTransactionContext.deleteData(yangInstanceIdentifier); + localTransactionContext.deleteData(yangInstanceIdentifier); + + verify(readWriteTransaction).delete(yangInstanceIdentifier); + + doReadyWithExpectedError(error); + } + + private void doReadyWithExpectedError(RuntimeException expError) { + LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class); + doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit(); + doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction); + + localTransactionContext.readyTransaction(); + + verify(mockCohort).setOperationError(expError); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index bd4d8ec588..83b15b99df 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -11,7 +11,6 @@ import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; - import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; @@ -988,6 +987,44 @@ public class ShardTest extends AbstractShardTest { }}; } + @Test + public void testBatchedModificationsWithOperationFailure() throws Throwable { + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testBatchedModificationsWithOperationFailure"); + + waitUntilLeader(shard); + + // Test merge with invalid data. An exception should occur when the merge is applied. Note that + // write will not validate the children for performance reasons. + + String transactionID = "tx1"; + + ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); + + BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null); + batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData)); + shard.tell(batched, getRef()); + Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + + Throwable cause = failure.cause(); + + batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null); + batched.setReady(true); + batched.setTotalMessagesSent(2); + + shard.tell(batched, getRef()); + + failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + assertEquals("Failure cause", cause, failure.cause()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + @SuppressWarnings("unchecked") private static void verifyOuterListEntry(final TestActorRef shard, final Object expIDValue) throws Exception { final NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH);