switch(parent.getType()) {
case READ_ONLY:
final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
- return new LocalTransactionContext(readOnly, parent.getIdentifier()) {
+ return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
throw new UnsupportedOperationException();
};
case READ_WRITE:
final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
- return new LocalTransactionContext(readWrite, parent.getIdentifier()) {
+ return new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return readWrite;
};
case WRITE_ONLY:
final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
- return new LocalTransactionContext(writeOnly, parent.getIdentifier()) {
+ return new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return writeOnly;
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
-import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
* It is not actually called by the front-end to perform 3PC thus the canCommit/preCommit/commit methods
* are no-ops.
*/
-abstract class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
private static final Logger LOG = LoggerFactory.getLogger(LocalThreePhaseCommitCohort.class);
private final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction;
private final DataTreeModification modification;
private final ActorContext actorContext;
private final ActorSelection leader;
+ private Exception operationError;
protected LocalThreePhaseCommitCohort(final ActorContext actorContext, final ActorSelection leader,
final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction, final DataTreeModification modification) {
this.modification = Preconditions.checkNotNull(modification);
}
+ protected LocalThreePhaseCommitCohort(final ActorContext actorContext, final ActorSelection leader,
+ final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction, final Exception operationError) {
+ this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.leader = Preconditions.checkNotNull(leader);
+ this.transaction = Preconditions.checkNotNull(transaction);
+ this.operationError = Preconditions.checkNotNull(operationError);
+ this.modification = null;
+ }
+
private Future<Object> initiateCommit(final boolean immediate) {
+ if(operationError != null) {
+ return Futures.failed(operationError);
+ }
+
final ReadyLocalTransaction message = new ReadyLocalTransaction(transaction.getIdentifier().toString(),
modification, immediate);
return actorContext.executeOperationAsync(leader, message, actorContext.getTransactionCommitOperationTimeout());
}
- /**
- * Return the {@link ActorContext} associated with this object.
- *
- * @return An actor context instance.
- */
- @Nonnull ActorContext getActorContext() {
- return actorContext;
+ void setOperationError(Exception operationError) {
+ this.operationError = operationError;
}
Future<ActorSelection> initiateCoordinatedCommit() {
throw new UnsupportedOperationException();
}
- protected abstract void transactionAborted(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction);
- protected abstract void transactionCommitted(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction);
+ protected void transactionAborted(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
+ }
+
+ protected void transactionCommitted(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
+ }
}
@Override
protected DOMStoreThreePhaseCommitCohort createCohort(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction, final DataTreeModification modification) {
- return new LocalThreePhaseCommitCohort(parent.getActorContext(), leader, transaction, modification) {
- @Override
- protected void transactionAborted(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
- onTransactionFailed(transaction, ABORTED);
- }
-
- @Override
- protected void transactionCommitted(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
- onTransactionCommited(transaction);
- }
- };
+ return new LocalChainThreePhaseCommitCohort(transaction, modification);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction(TransactionIdentifier identifier) {
return super.newWriteOnlyTransaction(identifier);
}
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public LocalThreePhaseCommitCohort onTransactionReady(DOMStoreWriteTransaction tx) {
+ try {
+ return (LocalThreePhaseCommitCohort) tx.ready();
+ } catch (Exception e) {
+ // Unfortunately we need to cast to SnapshotBackedWriteTransaction here as it's required by
+ // LocalThreePhaseCommitCohort and the base class.
+ return new LocalChainThreePhaseCommitCohort((SnapshotBackedWriteTransaction<TransactionIdentifier>)tx, e);
+ }
+ }
+
+ private class LocalChainThreePhaseCommitCohort extends LocalThreePhaseCommitCohort {
+
+ protected LocalChainThreePhaseCommitCohort(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction,
+ DataTreeModification modification) {
+ super(parent.getActorContext(), leader, transaction, modification);
+ }
+
+ protected LocalChainThreePhaseCommitCohort(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction,
+ Exception operationError) {
+ super(parent.getActorContext(), leader, transaction, operationError);
+ }
+
+ @Override
+ protected void transactionAborted(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
+ onTransactionFailed(transaction, ABORTED);
+ }
+
+ @Override
+ protected void transactionCommitted(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
+ onTransactionCommited(transaction);
+ }
+ }
}
*/
abstract class LocalTransactionContext extends AbstractTransactionContext {
private final DOMStoreTransaction txDelegate;
+ private final LocalTransactionReadySupport readySupport;
+ private Exception operationError;
- LocalTransactionContext(DOMStoreTransaction txDelegate, TransactionIdentifier identifier) {
+ LocalTransactionContext(DOMStoreTransaction txDelegate, TransactionIdentifier identifier,
+ LocalTransactionReadySupport readySupport) {
super(identifier);
this.txDelegate = Preconditions.checkNotNull(txDelegate);
+ this.readySupport = readySupport;
}
protected abstract DOMStoreWriteTransaction getWriteDelegate();
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
incrementModificationCount();
- getWriteDelegate().write(path, data);
+ if(operationError == null) {
+ try {
+ getWriteDelegate().write(path, data);
+ } catch (Exception e) {
+ operationError = e;
+ }
+ }
+
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
incrementModificationCount();
- getWriteDelegate().merge(path, data);
+ if(operationError == null) {
+ try {
+ getWriteDelegate().merge(path, data);
+ } catch (Exception e) {
+ operationError = e;
+ }
+ }
}
@Override
public void deleteData(YangInstanceIdentifier path) {
incrementModificationCount();
- getWriteDelegate().delete(path);
+ if(operationError == null) {
+ try {
+ getWriteDelegate().delete(path);
+ } catch (Exception e) {
+ operationError = e;
+ }
+ }
}
@Override
private LocalThreePhaseCommitCohort ready() {
logModificationCount();
- return (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
+ LocalThreePhaseCommitCohort cohort = readySupport.onTransactionReady(getWriteDelegate());
+ cohort.setOperationError(operationError);
+ return cohort;
}
@Override
*
* @author Thomas Pantelis
*/
-interface LocalTransactionFactory {
+interface LocalTransactionFactory extends LocalTransactionReadySupport {
DOMStoreReadTransaction newReadOnlyTransaction(TransactionIdentifier identifier);
DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier);
import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* {@link LocalTransactionFactory} for instantiating backing transactions which are
final class LocalTransactionFactoryImpl extends TransactionReadyPrototype<TransactionIdentifier>
implements LocalTransactionFactory {
- private static final Logger LOG = LoggerFactory.getLogger(LocalTransactionFactoryImpl.class);
private final ActorSelection leader;
private final DataTree dataTree;
private final ActorContext actorContext;
@Override
protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction<TransactionIdentifier> tx,
final DataTreeModification tree) {
- return new LocalThreePhaseCommitCohort(actorContext, leader, tx, tree) {
- @Override
- protected void transactionAborted(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
- // No-op
- LOG.debug("Transaction {} aborted", transaction);
- }
+ return new LocalThreePhaseCommitCohort(actorContext, leader, tx, tree);
+ }
- @Override
- protected void transactionCommitted(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
- // No-op
- LOG.debug("Transaction {} committed", transaction);
- }
- };
+ @SuppressWarnings("unchecked")
+ @Override
+ public LocalThreePhaseCommitCohort onTransactionReady(DOMStoreWriteTransaction tx) {
+ try {
+ return (LocalThreePhaseCommitCohort) tx.ready();
+ } catch (Exception e) {
+ // Unfortunately we need to cast to SnapshotBackedWriteTransaction here as it's required by
+ // LocalThreePhaseCommitCohort.
+ return new LocalThreePhaseCommitCohort(actorContext, leader,
+ (SnapshotBackedWriteTransaction<TransactionIdentifier>)tx, e);
+ }
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+
+/**
+ * Interface for a class that can "ready" a transaction.
+ *
+ * @author Thomas Pantelis
+ */
+interface LocalTransactionReadySupport {
+ LocalThreePhaseCommitCohort onTransactionReady(DOMStoreWriteTransaction tx);
+}
cohortEntry.applyModifications(batched.getModifications());
if(batched.isReady()) {
+ if(cohortEntry.getLastBatchedModificationsException() != null) {
+ cohortCache.remove(cohortEntry.getTransactionID());
+ throw cohortEntry.getLastBatchedModificationsException();
+ }
+
if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
+ cohortCache.remove(cohortEntry.getTransactionID());
throw new IllegalStateException(String.format(
"The total number of batched messages received %d does not match the number sent %d",
cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
private final String transactionID;
private ShardDataTreeCohort cohort;
private final ReadWriteShardDataTreeTransaction transaction;
+ private RuntimeException lastBatchedModificationsException;
private ActorRef replySender;
private Shard shard;
private boolean doImmediateCommit;
return totalBatchedModificationsReceived;
}
- void applyModifications(Iterable<Modification> modifications) {
- for (Modification modification : modifications) {
- modification.apply(transaction.getSnapshot());
- }
+ RuntimeException getLastBatchedModificationsException() {
+ return lastBatchedModificationsException;
+ }
+ void applyModifications(Iterable<Modification> modifications) {
totalBatchedModificationsReceived++;
+ if(lastBatchedModificationsException == null) {
+ for (Modification modification : modifications) {
+ try {
+ modification.apply(transaction.getSnapshot());
+ } catch (RuntimeException e) {
+ lastBatchedModificationsException = e;
+ throw e;
+ }
+ }
+ }
}
void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.AddressFromURIString;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
public class DistributedDataStoreIntegrationTest {
}};
}
+ @Test
+ public void testChainedTransactionFailureWithSingleShard() throws Exception{
+ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
+ DistributedDataStore dataStore = setupDistributedDataStore(
+ "testChainedTransactionFailureWithSingleShard", "cars-1");
+
+ ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
+ ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
+ LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
+
+ TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
+ DOMTransactionChain txChain = broker.createTransactionChain(listener);
+
+ DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
+
+ ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
+
+ rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
+
+ try {
+ rwTx.submit().checkedGet(5, TimeUnit.SECONDS);
+ fail("Expected TransactionCommitFailedException");
+ } catch (TransactionCommitFailedException e) {
+ // Expected
+ }
+
+ verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class));
+
+ txChain.close();
+ broker.close();
+ cleanup(dataStore);
+ }};
+ }
+
+ @Test
+ public void testChainedTransactionFailureWithMultipleShards() throws Exception{
+ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
+ DistributedDataStore dataStore = setupDistributedDataStore(
+ "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1");
+
+ ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
+ ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
+ LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
+
+ TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
+ DOMTransactionChain txChain = broker.createTransactionChain(listener);
+
+ DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+
+ ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
+
+ // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
+ // done for put for performance reasons.
+ writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
+
+ try {
+ writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
+ fail("Expected TransactionCommitFailedException");
+ } catch (TransactionCommitFailedException e) {
+ // Expected
+ }
+
+ verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
+
+ txChain.close();
+ broker.close();
+ cleanup(dataStore);
+ }};
+ }
+
@Test
public void testChangeListenerRegistration() throws Exception{
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.cluster.Cluster;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.MoreExecutors;
import com.typesafe.config.ConfigFactory;
import java.math.BigInteger;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
/**
assertEquals("isPresent", false, optional.isPresent());
}
+ @Test
+ public void testChainedTransactionFailureWithSingleShard() throws Exception {
+ initDatastores("testChainedTransactionFailureWithSingleShard");
+
+ ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
+ ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
+ LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
+ MoreExecutors.directExecutor());
+
+ TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
+ DOMTransactionChain txChain = broker.createTransactionChain(listener);
+
+ DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+
+ ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
+
+ writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
+
+ try {
+ writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
+ fail("Expected TransactionCommitFailedException");
+ } catch (TransactionCommitFailedException e) {
+ // Expected
+ }
+
+ verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
+
+ txChain.close();
+ broker.close();
+ }
+
+ @Test
+ public void testChainedTransactionFailureWithMultipleShards() throws Exception {
+ initDatastores("testChainedTransactionFailureWithMultipleShards");
+
+ ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
+ ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
+ LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
+ MoreExecutors.directExecutor());
+
+ TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
+ DOMTransactionChain txChain = broker.createTransactionChain(listener);
+
+ DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+
+ ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
+
+ // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
+ // done for put for performance reasons.
+ writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
+
+ try {
+ writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
+ fail("Expected TransactionCommitFailedException");
+ } catch (TransactionCommitFailedException e) {
+ // Expected
+ }
+
+ verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
+
+ txChain.close();
+ broker.close();
+ }
+
@Test
public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
String testName = "testSingleShardTransactionsWithLeaderChanges";
package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-import akka.dispatch.ExecutionContexts;
+import akka.actor.ActorSelection;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import scala.concurrent.Future;
public class LocalTransactionContextTest {
@Mock
DOMStoreReadWriteTransaction readWriteTransaction;
+ @Mock
+ LocalTransactionReadySupport mockReadySupport;
+
LocalTransactionContext localTransactionContext;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
- localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter.getIdentifier()) {
+ localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter.getIdentifier(), mockReadySupport) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return readWriteTransaction;
@Test
public void testReady() {
final LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
- final ActorContext mockContext = mock(ActorContext.class);
- doReturn(mockContext).when(mockCohort).getActorContext();
- doReturn(ExecutionContexts.fromExecutor(MoreExecutors.directExecutor())).when(mockContext).getClientDispatcher();
doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
- doReturn(mockCohort).when(readWriteTransaction).ready();
- localTransactionContext.readyTransaction();
- verify(readWriteTransaction).ready();
+ doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction);
+
+ Future<ActorSelection> future = localTransactionContext.readyTransaction();
+ assertTrue(future.isCompleted());
+
+ verify(mockReadySupport).onTransactionReady(readWriteTransaction);
+ }
+
+ @Test
+ public void testReadyWithWriteError() {
+ YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
+ NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
+ RuntimeException error = new RuntimeException("mock");
+ doThrow(error).when(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
+
+ localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
+ localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
+
+ verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
+
+ doReadyWithExpectedError(error);
}
+ @Test
+ public void testReadyWithMergeError() {
+ YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
+ NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
+ RuntimeException error = new RuntimeException("mock");
+ doThrow(error).when(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
+ localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
+ localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
+
+ verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
+
+ doReadyWithExpectedError(error);
+ }
+
+ @Test
+ public void testReadyWithDeleteError() {
+ YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
+ RuntimeException error = new RuntimeException("mock");
+ doThrow(error).when(readWriteTransaction).delete(yangInstanceIdentifier);
+
+ localTransactionContext.deleteData(yangInstanceIdentifier);
+ localTransactionContext.deleteData(yangInstanceIdentifier);
+
+ verify(readWriteTransaction).delete(yangInstanceIdentifier);
+
+ doReadyWithExpectedError(error);
+ }
+
+ private void doReadyWithExpectedError(RuntimeException expError) {
+ LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
+ doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
+ doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction);
+
+ localTransactionContext.readyTransaction();
+
+ verify(mockCohort).setOperationError(expError);
+ }
}
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
-
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
}};
}
+ @Test
+ public void testBatchedModificationsWithOperationFailure() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testBatchedModificationsWithOperationFailure");
+
+ waitUntilLeader(shard);
+
+ // Test merge with invalid data. An exception should occur when the merge is applied. Note that
+ // write will not validate the children for performance reasons.
+
+ String transactionID = "tx1";
+
+ ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
+
+ BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
+ batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
+ shard.tell(batched, getRef());
+ Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+ Throwable cause = failure.cause();
+
+ batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
+ batched.setReady(true);
+ batched.setTotalMessagesSent(2);
+
+ shard.tell(batched, getRef());
+
+ failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+ assertEquals("Failure cause", cause, failure.cause());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
@SuppressWarnings("unchecked")
private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);