Added a ReadFailedException that is used with the CheckedFuture.
Moved the RpcError list from TransactionCommitFailedException to a new base
OperationFailedException in yangtools and also derived ReadFailedException from
OperationFailedException.
Added a static MAPPER in ReadFailedException class using the new generalized base
ExceptionMapper class in yangtools. Also derived
TransactionCommitFailedExceptionMapper from ExceptionMapper.
Modified uses of Futures#makeChecked in the read Tx and write Tx submit
to use the new MappingCheckedFuture class in yangtools (see
https://git.opendaylight.org/gerrit/#/c/9240/ for details).
Change-Id: I5c4f717f0b8664b7d39c1e6f0366525f04e6634d
Signed-off-by: tpantelis <tpanteli@brocade.com>
import org.opendaylight.controller.md.sal.common.api.data.AsyncReadTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
/**
* A transaction that provides read access to a logical data store.
* @param path
* Path which uniquely identifies subtree which client want to
* read
- * @return Listenable Future which contains read result
+ * @return a CheckFuture containing the result of the read. The Future blocks until the
+ * commit operation is complete. Once complete:
* <ul>
- * <li>If data at supplied path exists the
- * {@link ListeblaFuture#get()} returns Optional object containing
- * data once read is done.
- * <li>If data at supplied path does not exists the
- * {@link ListenbleFuture#get()} returns {@link Optional#absent()}.
+ * <li>If the data at the supplied path exists, the Future returns an Optional object
+ * containing the data.</li>
+ * <li>If the data at the supplied path does not exist, the Future returns
+ * Optional#absent().</li>
+ * <li>If the read of the data fails, the Future will fail with a
+ * {@link ReadFailedException} or an exception derived from ReadFailedException.</li>
* </ul>
*/
- <T extends DataObject> ListenableFuture<Optional<T>> read(LogicalDatastoreType store, InstanceIdentifier<T> path);
+ <T extends DataObject> CheckedFuture<Optional<T>,ReadFailedException> read(
+ LogicalDatastoreType store, InstanceIdentifier<T> path);
}
import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
import org.opendaylight.yangtools.concepts.Delegator;
import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
abstract class AbstractForwardedTransaction<T extends AsyncTransaction<YangInstanceIdentifier, NormalizedNode<?, ?>>>
return codec;
}
- protected final <T extends DataObject> ListenableFuture<Optional<T>> doRead(final DOMDataReadTransaction readTx,
- final LogicalDatastoreType store, final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<T> path) {
- return Futures.transform(readTx.read(store, codec.toNormalized(path)), codec.deserializeFunction(path));
+ protected final <T extends DataObject> CheckedFuture<Optional<T>,ReadFailedException> doRead(
+ final DOMDataReadTransaction readTx, final LogicalDatastoreType store,
+ final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<T> path) {
+
+ return MappingCheckedFuture.create(
+ Futures.transform(readTx.read(store, codec.toNormalized(path)),
+ codec.deserializeFunction(path)),
+ ReadFailedException.MAPPER);
}
}
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
class BindingDataReadTransactionImpl extends AbstractForwardedTransaction<DOMDataReadOnlyTransaction> implements
ReadOnlyTransaction {
}
@Override
- public <T extends DataObject> ListenableFuture<Optional<T>> read(final LogicalDatastoreType store,
- final InstanceIdentifier<T> path) {
+ public <T extends DataObject> CheckedFuture<Optional<T>,ReadFailedException> read(
+ final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
return doRead(getDelegate(),store, path);
}
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
class BindingDataReadWriteTransactionImpl extends
BindingDataWriteTransactionImpl<DOMDataReadWriteTransaction> implements ReadWriteTransaction {
}
@Override
- public <T extends DataObject> ListenableFuture<Optional<T>> read(final LogicalDatastoreType store,
- final InstanceIdentifier<T> path) {
+ public <T extends DataObject> CheckedFuture<Optional<T>,ReadFailedException> read(
+ final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
return doRead(getDelegate(), store, path);
}
}
\ No newline at end of file
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>concepts</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>util</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-common</artifactId>
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.common.api.data;
+
+import org.opendaylight.yangtools.util.concurrent.ExceptionMapper;
+import org.opendaylight.yangtools.yang.common.OperationFailedException;
+import org.opendaylight.yangtools.yang.common.RpcError;
+
+/**
+ * An exception for a failed read.
+ */
+public class ReadFailedException extends OperationFailedException {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final ExceptionMapper<ReadFailedException> MAPPER =
+ new ExceptionMapper<ReadFailedException>("read", ReadFailedException.class) {
+ @Override
+ protected ReadFailedException newWithCause(String message, Throwable cause) {
+ return new ReadFailedException(message, cause);
+ }
+ };
+
+ public ReadFailedException(String message, RpcError... errors) {
+ super(message, errors);
+ }
+
+ public ReadFailedException(String message, Throwable cause, RpcError... errors) {
+ super(message, cause, errors);
+ }
+}
*/
package org.opendaylight.controller.md.sal.common.api.data;
-import java.util.Arrays;
-import java.util.List;
-
+import org.opendaylight.yangtools.yang.common.OperationFailedException;
import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
-
-import com.google.common.collect.ImmutableList;
/**
*
* failed.
*
*/
-public class TransactionCommitFailedException extends Exception {
+public class TransactionCommitFailedException extends OperationFailedException {
private static final long serialVersionUID = 1L;
- private final List<RpcError> errorList;
-
public TransactionCommitFailedException(final String message, final RpcError... errors) {
this(message, null, errors);
}
public TransactionCommitFailedException(final String message, final Throwable cause,
final RpcError... errors) {
- super(message, cause);
-
- if( errors != null && errors.length > 0 ) {
- errorList = ImmutableList.<RpcError>builder().addAll( Arrays.asList( errors ) ).build();
- }
- else {
- // Add a default RpcError.
- errorList = ImmutableList.of(RpcResultBuilder.newError(ErrorType.APPLICATION, null,
- getMessage(), null, null, getCause()));
- }
- }
-
- /**
- * Returns additional error information about this exception.
- *
- * @return a List of RpcErrors. There is always at least one RpcError.
- */
- public List<RpcError> getErrorList() {
- return errorList;
- }
-
- @Override
- public String getMessage() {
- return new StringBuilder( super.getMessage() ).append(", errors: ").append( errorList ).toString();
+ super(message, cause, errors);
}
}
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-common-api</artifactId>
package org.opendaylight.controller.cluster.datastore;
+import java.util.concurrent.Executors;
+
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
/**
*
private static final Logger
LOG = LoggerFactory.getLogger(DistributedDataStore.class);
+ private static final int DEFAULT_EXECUTOR_POOL_SIZE = 10;
private final String type;
private final ActorContext actorContext;
* This is typically used when we need to make a request to an actor and
* wait for it's response and the consumer needs to be provided a Future.
*
- * FIXME : Make the thread pool configurable
+ * FIXME : Make the thread pool size configurable.
*/
- private final ExecutorService executor =
- Executors.newFixedThreadPool(10);
+ private final ListeningExecutorService executor =
+ MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_EXECUTOR_POOL_SIZE));
public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, Configuration configuration) {
this(new ActorContext(actorSystem, actorSystem
import akka.event.LoggingAdapter;
import akka.japi.Creator;
import akka.serialization.Serialization;
+
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
private final String name;
- private SchemaContext schemaContext;
+ private volatile SchemaContext schemaContext;
private final ShardStats shardMBean;
import akka.actor.ActorPath;
import akka.actor.ActorSelection;
+
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
/**
* ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
private final ActorContext actorContext;
private final List<ActorPath> cohortPaths;
- private final ExecutorService executor;
+ private final ListeningExecutorService executor;
private final String transactionId;
public ThreePhaseCommitCohortProxy(ActorContext actorContext,
List<ActorPath> cohortPaths,
String transactionId,
- ExecutorService executor) {
+ ListeningExecutorService executor) {
this.actorContext = actorContext;
this.cohortPaths = cohortPaths;
}
@Override public ListenableFuture<Boolean> canCommit() {
- Callable<Boolean> call = new Callable() {
-
- @Override public Boolean call() throws Exception {
- for(ActorPath actorPath : cohortPaths){
- ActorSelection cohort = actorContext.actorSelection(actorPath);
-
- try {
- Object response =
- actorContext.executeRemoteOperation(cohort,
- new CanCommitTransaction().toSerializable(),
- ActorContext.ASK_DURATION);
-
- if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
- CanCommitTransactionReply reply =
- CanCommitTransactionReply.fromSerializable(response);
- if (!reply.getCanCommit()) {
- return false;
+ Callable<Boolean> call = new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ for(ActorPath actorPath : cohortPaths){
+ ActorSelection cohort = actorContext.actorSelection(actorPath);
+
+ try {
+ Object response =
+ actorContext.executeRemoteOperation(cohort,
+ new CanCommitTransaction().toSerializable(),
+ ActorContext.ASK_DURATION);
+
+ if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
+ CanCommitTransactionReply reply =
+ CanCommitTransactionReply.fromSerializable(response);
+ if (!reply.getCanCommit()) {
+ System.out.println("**TOM - failed: false");
+ return false;
+ }
}
+ } catch(RuntimeException e){
+ LOG.error("Unexpected Exception", e);
+ return false;
}
- } catch(RuntimeException e){
- LOG.error("Unexpected Exception", e);
- return false;
}
-
- }
- return true;
+ return true;
}
};
- ListenableFutureTask<Boolean>
- future = ListenableFutureTask.create(call);
-
- executor.submit(future);
-
- return future;
+ return executor.submit(call);
}
@Override public ListenableFuture<Void> preCommit() {
}
};
- ListenableFutureTask<Void>
- future = ListenableFutureTask.create(call);
-
- executor.submit(future);
-
- return future;
-
+ return executor.submit(call);
}
public List<ActorPath> getCohortPaths() {
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import java.util.concurrent.ExecutorService;
+import com.google.common.util.concurrent.ListeningExecutorService;
/**
* TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
*/
public class TransactionChainProxy implements DOMStoreTransactionChain{
private final ActorContext actorContext;
- private final ExecutorService transactionExecutor;
+ private final ListeningExecutorService transactionExecutor;
private final SchemaContext schemaContext;
- public TransactionChainProxy(ActorContext actorContext, ExecutorService transactionExecutor, SchemaContext schemaContext) {
+ public TransactionChainProxy(ActorContext actorContext, ListeningExecutorService transactionExecutor,
+ SchemaContext schemaContext) {
this.actorContext = actorContext;
this.transactionExecutor = transactionExecutor;
this.schemaContext = schemaContext;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
+
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
/**
private final ActorContext actorContext;
private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
private final String identifier;
- private final ExecutorService executor;
+ private final ListeningExecutorService executor;
private final SchemaContext schemaContext;
public TransactionProxy(
ActorContext actorContext,
TransactionType transactionType,
- ExecutorService executor,
+ ListeningExecutorService executor,
SchemaContext schemaContext
) {
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final YangInstanceIdentifier path) {
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
+ final YangInstanceIdentifier path) {
createTransactionIfMissing(actorContext, path);
remoteTransactionPaths.put(shardName, transactionContext);
}
} catch(TimeoutException e){
+ LOG.warn("Timed out trying to create transaction on shard {}: {}", shardName, e);
remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName));
}
}
void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
- ListenableFuture<Optional<NormalizedNode<?, ?>>> readData(final YangInstanceIdentifier path);
+ CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
+ final YangInstanceIdentifier path);
void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
}
getActor().tell(new MergeData(path, data, schemaContext).toSerializable(), null);
}
- @Override public ListenableFuture<Optional<NormalizedNode<?, ?>>> readData(final YangInstanceIdentifier path) {
+ @Override public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
+ final YangInstanceIdentifier path) {
- Callable<Optional<NormalizedNode<?,?>>> call = new Callable() {
+ Callable<Optional<NormalizedNode<?,?>>> call = new Callable<Optional<NormalizedNode<?,?>>>() {
@Override public Optional<NormalizedNode<?,?>> call() throws Exception {
Object response = actorContext
if(reply.getNormalizedNode() == null){
return Optional.absent();
}
- //FIXME : A cast should not be required here ???
- return (Optional<NormalizedNode<?, ?>>) Optional.of(reply.getNormalizedNode());
+ return Optional.<NormalizedNode<?,?>>of(reply.getNormalizedNode());
}
return Optional.absent();
}
};
- ListenableFutureTask<Optional<NormalizedNode<?, ?>>>
- future = ListenableFutureTask.create(call);
-
- executor.submit(future);
-
- return future;
+ return MappingCheckedFuture.create(executor.submit(call), ReadFailedException.MAPPER);
}
@Override public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> readData(
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
YangInstanceIdentifier path) {
LOG.error("readData called path = {}", path);
- return Futures.immediateFuture(
+ return Futures.immediateCheckedFuture(
Optional.<NormalizedNode<?, ?>>absent());
}
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
+
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
- Thread.sleep(1000);
+ // This sleep is fragile - test can fail intermittently if all Shards aren't updated with
+ // the SchemaContext in time. Is there any way we can make this deterministic?
+ Thread.sleep(2000);
DOMStoreReadWriteTransaction transaction =
distributedDataStore.newReadWriteTransaction();
ListenableFuture<Boolean> canCommit = ready.canCommit();
- assertTrue(canCommit.get());
+ assertTrue(canCommit.get(5, TimeUnit.SECONDS));
ListenableFuture<Void> preCommit = ready.preCommit();
- preCommit.get();
+ preCommit.get(5, TimeUnit.SECONDS);
ListenableFuture<Void> commit = ready.commit();
- commit.get();
-
+ commit.get(5, TimeUnit.SECONDS);
}
@Test
public void integrationTestWithMultiShardConfiguration()
- throws ExecutionException, InterruptedException {
+ throws ExecutionException, InterruptedException, TimeoutException {
Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
ShardStrategyFactory.setConfiguration(configuration);
distributedDataStore.onGlobalContextUpdated(SchemaContextHelper.full());
- Thread.sleep(1000);
+ // This sleep is fragile - test can fail intermittently if all Shards aren't updated with
+ // the SchemaContext in time. Is there any way we can make this deterministic?
+ Thread.sleep(2000);
DOMStoreReadWriteTransaction transaction =
distributedDataStore.newReadWriteTransaction();
ListenableFuture<Boolean> canCommit = ready.canCommit();
- assertTrue(canCommit.get());
+ assertTrue(canCommit.get(5, TimeUnit.SECONDS));
ListenableFuture<Void> preCommit = ready.preCommit();
- preCommit.get();
+ preCommit.get(5, TimeUnit.SECONDS);
ListenableFuture<Void> commit = ready.commit();
- commit.get();
-
+ commit.get(5, TimeUnit.SECONDS);
}
}
import akka.actor.ActorRef;
import akka.actor.Props;
+
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
import junit.framework.Assert;
+
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
import java.util.Arrays;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.junit.Assert.assertNotNull;
private Props props;
private ActorRef actorRef;
private MockActorContext actorContext;
- private ExecutorService executor = Executors.newSingleThreadExecutor();
+ private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
+ Executors.newSingleThreadExecutor());
@Before
public void setUp(){
}
+ @After
+ public void tearDown() {
+ executor.shutdownNow();
+ }
+
@Test
public void testCanCommit() throws Exception {
actorContext.setExecuteRemoteOperationResponse(new CanCommitTransactionReply(true).toSerializable());
import akka.actor.ActorRef;
import akka.actor.Props;
+
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
import junit.framework.Assert;
+
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import java.util.List;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TransactionProxyTest extends AbstractActorTest {
private final ActorContext testContext =
new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration );
- private ExecutorService transactionExecutor =
- Executors.newSingleThreadExecutor();
+ private final ListeningExecutorService transactionExecutor =
+ MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
@Before
public void setUp(){
ShardStrategyFactory.setConfiguration(configuration);
}
+ @After
+ public void tearDown() {
+ transactionExecutor.shutdownNow();
+ }
+
@Test
public void testRead() throws Exception {
final Props props = Props.create(DoNothingActor.class);
import org.opendaylight.controller.md.sal.common.api.data.AsyncReadTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
/**
* A transaction that provides read access to a logical data store.
* @param path
* Path which uniquely identifies subtree which client want to
* read
- * @return Listenable Future which contains read result
+ * @return a CheckFuture containing the result of the read. The Future blocks until the
+ * commit operation is complete. Once complete:
* <ul>
- * <li>If data at supplied path exists the
- * {@link ListeblaFuture#get()} returns Optional object containing
- * data once read is done.
- * <li>If data at supplied path does not exists the
- * {@link ListenbleFuture#get()} returns {@link Optional#absent()}.
+ * <li>If the data at the supplied path exists, the Future returns an Optional object
+ * containing the data.</li>
+ * <li>If the data at the supplied path does not exist, the Future returns
+ * Optional#absent().</li>
+ * <li>If the read of the data fails, the Future will fail with a
+ * {@link ReadFailedException} or an exception derived from ReadFailedException.</li>
* </ul>
*/
- ListenableFuture<Optional<NormalizedNode<?,?>>> read(LogicalDatastoreType store,YangInstanceIdentifier path);
+ CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(
+ LogicalDatastoreType store, YangInstanceIdentifier path);
}
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
}
- return Futures.makeChecked(commitFuture, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+ return MappingCheckedFuture.create(commitFuture,
+ TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
}
/**
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
- return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
+ return MappingCheckedFuture.create(compositeResult,
+ TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
}
/**
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
- return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+ return MappingCheckedFuture.create(compositeResult,
+ TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
}
/**
}
ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
- return Futures
- .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
+ return MappingCheckedFuture.create(allSuccessFuture,
+ TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
}
package org.opendaylight.controller.md.sal.dom.broker.impl;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
/**
*
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
- final YangInstanceIdentifier path) {
+ public CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(
+ final LogicalDatastoreType store, final YangInstanceIdentifier path) {
return getSubtransaction(store).read(path);
}
*/package org.opendaylight.controller.md.sal.dom.broker.impl;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
/**
*
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
- final YangInstanceIdentifier path) {
+ public CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(
+ final LogicalDatastoreType store, final YangInstanceIdentifier path) {
return getSubtransaction(store).read(path);
}
}
\ No newline at end of file
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import java.util.concurrent.ExecutionException;
-
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.util.concurrent.ExceptionMapper;
/**
+ * Utility exception mapper which translates Exception to {@link TransactionCommitFailedException}.
*
- * Utility exception mapper which translates {@link Exception}
- * to {@link TransactionCommitFailedException}.
- *
- * This mapper is intended to be used with {@link com.google.common.util.concurrent.Futures#makeChecked(com.google.common.util.concurrent.ListenableFuture, Function)}
- * <ul>
- * <li>if exception is {@link TransactionCommitFailedException} or one of its subclasses returns original exception.
- * <li>if exception is {@link ExecutionException} and cause is {@link TransactionCommitFailedException} return cause
- * <li>otherwise returns {@link TransactionCommitFailedException} with original exception as a cause.
- * </ul>
- *
+ * @see ExceptionMapper
*/
-
-final class TransactionCommitFailedExceptionMapper implements
- Function<Exception, TransactionCommitFailedException> {
+final class TransactionCommitFailedExceptionMapper
+ extends ExceptionMapper<TransactionCommitFailedException> {
static final TransactionCommitFailedExceptionMapper PRE_COMMIT_MAPPER = create("canCommit");
static final TransactionCommitFailedExceptionMapper COMMIT_ERROR_MAPPER = create("commit");
- private final String opName;
-
private TransactionCommitFailedExceptionMapper(final String opName) {
- this.opName = Preconditions.checkNotNull(opName);
+ super( opName, TransactionCommitFailedException.class );
}
public static final TransactionCommitFailedExceptionMapper create(final String opName) {
}
@Override
- public TransactionCommitFailedException apply(final Exception e) {
- // If excetion is TransactionCommitFailedException
- // we reuse it directly.
- if (e instanceof TransactionCommitFailedException) {
- return (TransactionCommitFailedException) e;
- }
- // If error is ExecutionException which was caused by cause of
- // TransactionCommitFailedException
- // we reuse original cause
- if (e instanceof ExecutionException && e.getCause() instanceof TransactionCommitFailedException) {
- return (TransactionCommitFailedException) e.getCause();
- }
- if (e instanceof InterruptedException) {
- return new TransactionCommitFailedException(opName + " failed - DOMStore was interupted.", e);
- }
- // Otherwise we are using new exception, with original cause
- return new TransactionCommitFailedException(opName + " failed", e);
+ protected TransactionCommitFailedException newWithCause( String message, Throwable cause ) {
+ return new TransactionCommitFailedException( message, cause );
}
}
\ No newline at end of file
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
+
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import javax.annotation.Nullable;
+
import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
import org.opendaylight.controller.md.sal.common.api.data.DataReader;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
+ final LogicalDatastoreType store, final YangInstanceIdentifier path) {
CompositeNode rawData = null;
final Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalized = normalizer.toNormalized(path, rawData);
final Optional<NormalizedNode<?, ?>> normalizedNodeOptional = Optional.<NormalizedNode<?, ?>>fromNullable(normalized.getValue());
- return com.google.common.util.concurrent.Futures.immediateFuture(normalizedNodeOptional);
+ return Futures.immediateCheckedFuture(normalizedNodeOptional);
}
}
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
+ final LogicalDatastoreType store, final YangInstanceIdentifier path) {
return new BackwardsCompatibleReadTransaction(dataReader, dataNormalizer).read(store, path);
}
*/
package org.opendaylight.controller.sal.core.spi.data;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
public interface DOMStoreReadTransaction extends DOMStoreTransaction {
/**
- *
* Reads data from provided logical data store located at provided path
*
- *
* @param path
* Path which uniquely identifies subtree which client want to
* read
- * @return Listenable Future which contains read result
+ * @return a CheckFuture containing the result of the read. The Future blocks until the
+ * commit operation is complete. Once complete:
* <ul>
- * <li>If data at supplied path exists the {@link java.util.concurrent.Future#get()}
- * returns Optional object containing data
- * <li>If data at supplied path does not exists the
- * {@link java.util.concurrent.Future#get()} returns {@link Optional#absent()}.
+ * <li>If the data at the supplied path exists, the Future returns an Optional object
+ * containing the data.</li>
+ * <li>If the data at the supplied path does not exist, the Future returns
+ * Optional#absent().</li>
+ * <li>If the read of the data fails, the Future will fail with a
+ * {@link ReadFailedException} or an exception derived from ReadFailedException.</li>
* </ul>
*/
- ListenableFuture<Optional<NormalizedNode<?,?>>> read(YangInstanceIdentifier path);
+ CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(YangInstanceIdentifier path);
}
<artifactId>binding-generator-impl</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>mockito-configuration</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-test-model</artifactId>
<scope>test</scope>
- <version>${project.version}</version>
- </dependency>
+ </dependency>
</dependencies>
<build>
package org.opendaylight.controller.md.sal.dom.store.impl;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
/**
*
* which delegates most of its calls to similar methods provided by underlying snapshot.
*
*/
-final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction implements
-DOMStoreReadTransaction {
+final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction
+ implements DOMStoreReadTransaction {
+
private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadTransaction.class);
private DataTreeSnapshot stableSnapshot;
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final YangInstanceIdentifier path) {
+ public CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
+ LOG.debug("Tx: {} Read: {}", getIdentifier(), path);
checkNotNull(path, "Path must not be null.");
- checkState(stableSnapshot != null, "Transaction is closed");
- return Futures.immediateFuture(stableSnapshot.readNode(path));
+
+ if(stableSnapshot == null) {
+ return Futures.immediateFailedCheckedFuture(new ReadFailedException("Transaction is closed"));
+ }
+
+ try {
+ return Futures.immediateCheckedFuture(stableSnapshot.readNode(path));
+ } catch (Exception e) {
+ LOG.error("Tx: {} Failed Read of {}", getIdentifier(), path, e);
+ return Futures.immediateFailedCheckedFuture(new ReadFailedException("Read failed",e));
+ }
}
}
\ No newline at end of file
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
/**
* Implementation of Read-Write transaction which is backed by {@link DataTreeSnapshot}
* and executed according to {@link TransactionReadyPrototype}.
*
*/
-class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction implements
-DOMStoreReadWriteTransaction {
+class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction
+ implements DOMStoreReadWriteTransaction {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadWriteTransaction.class);
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final YangInstanceIdentifier path) {
+ public CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
LOG.debug("Tx: {} Read: {}", getIdentifier(), path);
+ checkNotNull(path, "Path must not be null.");
+
+ DataTreeModification dataView = getMutatedView();
+ if(dataView == null) {
+ return Futures.immediateFailedCheckedFuture(new ReadFailedException("Transaction is closed"));
+ }
+
try {
- return Futures.immediateFuture(getMutatedView().readNode(path));
+ return Futures.immediateCheckedFuture(dataView.readNode(path));
} catch (Exception e) {
LOG.error("Tx: {} Failed Read of {}", getIdentifier(), path, e);
- throw e;
+ return Futures.immediateFailedCheckedFuture(new ReadFailedException("Read failed",e));
}
}
}
\ No newline at end of file
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import com.google.common.base.Optional;
public class InMemoryDataStoreTest {
- private SchemaContext schemaContext;
- private InMemoryDOMDataStore domStore;
+ private SchemaContext schemaContext;
+ private InMemoryDOMDataStore domStore;
- @Before
- public void setupStore() {
- domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
- schemaContext = TestModel.createTestContext();
- domStore.onGlobalContextUpdated(schemaContext);
+ @Before
+ public void setupStore() {
+ domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
+ schemaContext = TestModel.createTestContext();
+ domStore.onGlobalContextUpdated(schemaContext);
+ }
- }
+ @Test
+ public void testTransactionIsolation() throws InterruptedException, ExecutionException {
- @Test
- public void testTransactionIsolation() throws InterruptedException, ExecutionException {
+ assertNotNull(domStore);
- assertNotNull(domStore);
+ DOMStoreReadTransaction readTx = domStore.newReadOnlyTransaction();
+ assertNotNull(readTx);
- DOMStoreReadTransaction readTx = domStore.newReadOnlyTransaction();
- assertNotNull(readTx);
+ DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
+ assertNotNull(writeTx);
- DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
- assertNotNull(writeTx);
- /**
- *
- * Writes /test in writeTx
- *
- */
- writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ /**
+ * Writes /test in writeTx
+ */
+ NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ writeTx.write(TestModel.TEST_PATH, testNode);
- /**
- *
- * Reads /test from writeTx Read should return container.
- *
- */
- ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(TestModel.TEST_PATH);
- assertTrue(writeTxContainer.get().isPresent());
+ /**
+ * Reads /test from writeTx Read should return container.
+ */
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(TestModel.TEST_PATH);
+ assertEquals("read: isPresent", true, writeTxContainer.get().isPresent());
+ assertEquals("read: data", testNode, writeTxContainer.get().get());
- /**
- *
- * Reads /test from readTx Read should return Absent.
- *
- */
- ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx.read(TestModel.TEST_PATH);
- assertFalse(readTxContainer.get().isPresent());
- }
+ /**
+ * Reads /test from readTx Read should return Absent.
+ */
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx.read(TestModel.TEST_PATH);
+ assertEquals("read: isPresent", false, readTxContainer.get().isPresent());
+ }
- @Test
- public void testTransactionCommit() throws InterruptedException, ExecutionException {
+ @Test
+ public void testTransactionCommit() throws InterruptedException, ExecutionException {
- DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
- assertNotNull(writeTx);
- /**
- *
- * Writes /test in writeTx
- *
- */
- writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
+ assertNotNull(writeTx);
- /**
- *
- * Reads /test from writeTx Read should return container.
- *
- */
- ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(TestModel.TEST_PATH);
- assertTrue(writeTxContainer.get().isPresent());
+ /**
+ * Writes /test in writeTx
+ */
+ NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ writeTx.write(TestModel.TEST_PATH, testNode);
- DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
+ /**
+ * Reads /test from writeTx Read should return container.
+ */
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(TestModel.TEST_PATH);
+ assertEquals("read: isPresent", true, writeTxContainer.get().isPresent());
+ assertEquals("read: data", testNode, writeTxContainer.get().get());
- assertThreePhaseCommit(cohort);
+ DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
- Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH)
- .get();
- assertTrue(afterCommitRead.isPresent());
- }
+ assertThreePhaseCommit(cohort);
- @Test
- public void testTransactionAbort() throws InterruptedException, ExecutionException {
+ Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH)
+ .get();
+ assertEquals("After commit read: isPresent", true, afterCommitRead.isPresent());
+ assertEquals("After commit read: data", testNode, afterCommitRead.get());
+ }
- DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
- assertNotNull(writeTx);
+ @Test
+ public void testDelete() throws Exception {
- assertTestContainerWrite(writeTx);
+ DOMStoreWriteTransaction writeTx = domStore.newWriteOnlyTransaction();
+ assertNotNull( writeTx );
- DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
+ // Write /test and commit
- assertTrue(cohort.canCommit().get().booleanValue());
- cohort.preCommit().get();
- cohort.abort().get();
+ writeTx.write( TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
- Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH)
- .get();
- assertFalse(afterCommitRead.isPresent());
- }
+ assertThreePhaseCommit( writeTx.ready() );
- @Test
- public void testTransactionChain() throws InterruptedException, ExecutionException {
- DOMStoreTransactionChain txChain = domStore.createTransactionChain();
- assertNotNull(txChain);
+ Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().
+ read(TestModel.TEST_PATH ).get();
+ assertEquals( "After commit read: isPresent", true, afterCommitRead.isPresent() );
- /**
- * We alocate new read-write transaction and write /test
- *
- *
- */
- DOMStoreReadWriteTransaction firstTx = txChain.newReadWriteTransaction();
- assertTestContainerWrite(firstTx);
+ // Delete /test and verify
- /**
- * First transaction is marked as ready, we are able to allocate chained
- * transactions
- */
- DOMStoreThreePhaseCommitCohort firstWriteTxCohort = firstTx.ready();
+ writeTx = domStore.newWriteOnlyTransaction();
- /**
- * We alocate chained transaction - read transaction, note first one is
- * still not commited to datastore.
- */
- DOMStoreReadTransaction secondReadTx = txChain.newReadOnlyTransaction();
+ writeTx.delete( TestModel.TEST_PATH );
- /**
- *
- * We test if we are able to read data from tx, read should not fail
- * since we are using chained transaction.
- *
- *
- */
- assertTestContainerExists(secondReadTx);
+ assertThreePhaseCommit( writeTx.ready() );
- /**
- *
- * We alocate next transaction, which is still based on first one, but
- * is read-write.
- *
- */
- DOMStoreReadWriteTransaction thirdDeleteTx = txChain.newReadWriteTransaction();
+ afterCommitRead = domStore.newReadOnlyTransaction().
+ read(TestModel.TEST_PATH ).get();
+ assertEquals( "After commit read: isPresent", false, afterCommitRead.isPresent() );
+ }
- /**
- * We test existence of /test in third transaction container should
- * still be visible from first one (which is still uncommmited).
- *
- *
- */
- assertTestContainerExists(thirdDeleteTx);
+ @Test
+ public void testMerge() throws Exception {
- /**
- * We delete node in third transaction
- */
- thirdDeleteTx.delete(TestModel.TEST_PATH);
+ DOMStoreWriteTransaction writeTx = domStore.newWriteOnlyTransaction();
+ assertNotNull( writeTx );
- /**
- * third transaction is sealed.
- */
- DOMStoreThreePhaseCommitCohort thirdDeleteTxCohort = thirdDeleteTx.ready();
+ ContainerNode containerNode = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier( new NodeIdentifier( TestModel.TEST_QNAME ) )
+ .addChild( ImmutableNodes.mapNodeBuilder( TestModel.OUTER_LIST_QNAME )
+ .addChild( ImmutableNodes.mapEntry( TestModel.OUTER_LIST_QNAME,
+ TestModel.ID_QNAME, 1 ) ).build() ).build();
- /**
- * We commit first transaction
- *
- */
- assertThreePhaseCommit(firstWriteTxCohort);
+ writeTx.merge( TestModel.TEST_PATH, containerNode );
- // Alocates store transacion
- DOMStoreReadTransaction storeReadTx = domStore.newReadOnlyTransaction();
- /**
- * We verify transaction is commited to store, container should exists
- * in datastore.
- */
- assertTestContainerExists(storeReadTx);
- /**
- * We commit third transaction
- *
- */
- assertThreePhaseCommit(thirdDeleteTxCohort);
- }
+ assertThreePhaseCommit( writeTx.ready() );
- @Test
- @Ignore
- public void testTransactionConflict() throws InterruptedException, ExecutionException {
- DOMStoreReadWriteTransaction txOne = domStore.newReadWriteTransaction();
- DOMStoreReadWriteTransaction txTwo = domStore.newReadWriteTransaction();
- assertTestContainerWrite(txOne);
- assertTestContainerWrite(txTwo);
+ Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().
+ read(TestModel.TEST_PATH ).get();
+ assertEquals( "After commit read: isPresent", true, afterCommitRead.isPresent() );
+ assertEquals( "After commit read: data", containerNode, afterCommitRead.get() );
- /**
- * Commits transaction
- */
- assertThreePhaseCommit(txOne.ready());
+ // Merge a new list entry node
- /**
- * Asserts that txTwo could not be commited
- */
- assertFalse(txTwo.ready().canCommit().get());
- }
-
- private static void assertThreePhaseCommit(final DOMStoreThreePhaseCommitCohort cohort)
- throws InterruptedException, ExecutionException {
- assertTrue(cohort.canCommit().get().booleanValue());
- cohort.preCommit().get();
- cohort.commit().get();
- }
-
- private static Optional<NormalizedNode<?, ?>> assertTestContainerWrite(final DOMStoreReadWriteTransaction writeTx)
- throws InterruptedException, ExecutionException {
- /**
- *
- * Writes /test in writeTx
- *
- */
- writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ writeTx = domStore.newWriteOnlyTransaction();
+ assertNotNull( writeTx );
+
+ containerNode = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier( new NodeIdentifier( TestModel.TEST_QNAME ) )
+ .addChild( ImmutableNodes.mapNodeBuilder( TestModel.OUTER_LIST_QNAME )
+ .addChild( ImmutableNodes.mapEntry( TestModel.OUTER_LIST_QNAME,
+ TestModel.ID_QNAME, 1 ) )
+ .addChild( ImmutableNodes.mapEntry( TestModel.OUTER_LIST_QNAME,
+ TestModel.ID_QNAME, 2 ) ).build() ).build();
+
+ writeTx.merge( TestModel.TEST_PATH, containerNode );
+
+ assertThreePhaseCommit( writeTx.ready() );
+
+ afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH ).get();
+ assertEquals( "After commit read: isPresent", true, afterCommitRead.isPresent() );
+ assertEquals( "After commit read: data", containerNode, afterCommitRead.get() );
+ }
+
+ @Test(expected=ReadFailedException.class)
+ public void testReadWithReadOnlyTransactionClosed() throws Throwable {
+
+ DOMStoreReadTransaction readTx = domStore.newReadOnlyTransaction();
+ assertNotNull( readTx );
+
+ readTx.close();
+
+ doReadAndThrowEx( readTx );
+ }
+
+ @Test(expected=ReadFailedException.class)
+ public void testReadWithReadOnlyTransactionFailure() throws Throwable {
+
+ DataTreeSnapshot mockSnapshot = Mockito.mock( DataTreeSnapshot.class );
+ Mockito.doThrow( new RuntimeException( "mock ex" ) ).when( mockSnapshot )
+ .readNode( Mockito.any( YangInstanceIdentifier.class ) );
+
+ DOMStoreReadTransaction readTx = new SnapshotBackedReadTransaction( "1", mockSnapshot );
+
+ doReadAndThrowEx( readTx );
+ }
- return assertTestContainerExists(writeTx);
- }
+ @Test(expected=ReadFailedException.class)
+ public void testReadWithReadWriteTransactionClosed() throws Throwable {
- /**
- * Reads /test from readTx Read should return container.
- */
- private static Optional<NormalizedNode<?, ?>> assertTestContainerExists(final DOMStoreReadTransaction readTx)
- throws InterruptedException, ExecutionException {
+ DOMStoreReadTransaction readTx = domStore.newReadWriteTransaction();
+ assertNotNull( readTx );
+
+ readTx.close();
+
+ doReadAndThrowEx( readTx );
+ }
+
+ @Test(expected=ReadFailedException.class)
+ public void testReadWithReadWriteTransactionFailure() throws Throwable {
+
+ DataTreeSnapshot mockSnapshot = Mockito.mock( DataTreeSnapshot.class );
+ DataTreeModification mockModification = Mockito.mock( DataTreeModification.class );
+ Mockito.doThrow( new RuntimeException( "mock ex" ) ).when( mockModification )
+ .readNode( Mockito.any( YangInstanceIdentifier.class ) );
+ Mockito.doReturn( mockModification ).when( mockSnapshot ).newModification();
+ TransactionReadyPrototype mockReady = Mockito.mock( TransactionReadyPrototype.class );
+ DOMStoreReadTransaction readTx = new SnapshotBackedReadWriteTransaction( "1", mockSnapshot, mockReady );
+
+ doReadAndThrowEx( readTx );
+ }
+
+ private void doReadAndThrowEx( DOMStoreReadTransaction readTx ) throws Throwable {
+
+ try {
+ readTx.read(TestModel.TEST_PATH).get();
+ } catch( ExecutionException e ) {
+ throw e.getCause();
+ }
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testWriteWithTransactionReady() throws Exception {
+
+ DOMStoreWriteTransaction writeTx = domStore.newWriteOnlyTransaction();
+
+ writeTx.ready();
+
+ // Should throw ex
+ writeTx.write( TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testReadyWithTransactionAlreadyReady() throws Exception {
+
+ DOMStoreWriteTransaction writeTx = domStore.newWriteOnlyTransaction();
+
+ writeTx.ready();
+
+ // Should throw ex
+ writeTx.ready();
+ }
+
+ @Test
+ public void testTransactionAbort() throws InterruptedException, ExecutionException {
+
+ DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
+ assertNotNull(writeTx);
+
+ assertTestContainerWrite(writeTx);
+
+ DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
+
+ assertTrue(cohort.canCommit().get().booleanValue());
+ cohort.preCommit().get();
+ cohort.abort().get();
+
+ Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH)
+ .get();
+ assertFalse(afterCommitRead.isPresent());
+ }
+
+ @Test
+ public void testTransactionChain() throws InterruptedException, ExecutionException {
+ DOMStoreTransactionChain txChain = domStore.createTransactionChain();
+ assertNotNull(txChain);
+
+ /**
+ * We alocate new read-write transaction and write /test
+ *
+ *
+ */
+ DOMStoreReadWriteTransaction firstTx = txChain.newReadWriteTransaction();
+ assertTestContainerWrite(firstTx);
+
+ /**
+ * First transaction is marked as ready, we are able to allocate chained
+ * transactions
+ */
+ DOMStoreThreePhaseCommitCohort firstWriteTxCohort = firstTx.ready();
+
+ /**
+ * We alocate chained transaction - read transaction, note first one is
+ * still not commited to datastore.
+ */
+ DOMStoreReadTransaction secondReadTx = txChain.newReadOnlyTransaction();
+
+ /**
+ *
+ * We test if we are able to read data from tx, read should not fail
+ * since we are using chained transaction.
+ *
+ *
+ */
+ assertTestContainerExists(secondReadTx);
+
+ /**
+ *
+ * We alocate next transaction, which is still based on first one, but
+ * is read-write.
+ *
+ */
+ DOMStoreReadWriteTransaction thirdDeleteTx = txChain.newReadWriteTransaction();
+
+ /**
+ * We test existence of /test in third transaction container should
+ * still be visible from first one (which is still uncommmited).
+ *
+ *
+ */
+ assertTestContainerExists(thirdDeleteTx);
+
+ /**
+ * We delete node in third transaction
+ */
+ thirdDeleteTx.delete(TestModel.TEST_PATH);
+
+ /**
+ * third transaction is sealed.
+ */
+ DOMStoreThreePhaseCommitCohort thirdDeleteTxCohort = thirdDeleteTx.ready();
+
+ /**
+ * We commit first transaction
+ *
+ */
+ assertThreePhaseCommit(firstWriteTxCohort);
+
+ // Alocates store transacion
+ DOMStoreReadTransaction storeReadTx = domStore.newReadOnlyTransaction();
+ /**
+ * We verify transaction is commited to store, container should exists
+ * in datastore.
+ */
+ assertTestContainerExists(storeReadTx);
+ /**
+ * We commit third transaction
+ *
+ */
+ assertThreePhaseCommit(thirdDeleteTxCohort);
+ }
+
+ @Test
+ @Ignore
+ public void testTransactionConflict() throws InterruptedException, ExecutionException {
+ DOMStoreReadWriteTransaction txOne = domStore.newReadWriteTransaction();
+ DOMStoreReadWriteTransaction txTwo = domStore.newReadWriteTransaction();
+ assertTestContainerWrite(txOne);
+ assertTestContainerWrite(txTwo);
+
+ /**
+ * Commits transaction
+ */
+ assertThreePhaseCommit(txOne.ready());
+
+ /**
+ * Asserts that txTwo could not be commited
+ */
+ assertFalse(txTwo.ready().canCommit().get());
+ }
+
+ private static void assertThreePhaseCommit(final DOMStoreThreePhaseCommitCohort cohort)
+ throws InterruptedException, ExecutionException {
+ assertTrue(cohort.canCommit().get().booleanValue());
+ cohort.preCommit().get();
+ cohort.commit().get();
+ }
+
+ private static Optional<NormalizedNode<?, ?>> assertTestContainerWrite(final DOMStoreReadWriteTransaction writeTx)
+ throws InterruptedException, ExecutionException {
+ /**
+ *
+ * Writes /test in writeTx
+ *
+ */
+ writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ return assertTestContainerExists(writeTx);
+ }
+
+ /**
+ * Reads /test from readTx Read should return container.
+ */
+ private static Optional<NormalizedNode<?, ?>> assertTestContainerExists(final DOMStoreReadTransaction readTx)
+ throws InterruptedException, ExecutionException {
- ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = readTx.read(TestModel.TEST_PATH);
- assertTrue(writeTxContainer.get().isPresent());
- return writeTxContainer.get();
- }
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = readTx.read(TestModel.TEST_PATH);
+ assertTrue(writeTxContainer.get().isPresent());
+ return writeTxContainer.get();
+ }
}
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
this.id = id;
}
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> readConfigurationData(final YangInstanceIdentifier path) {
+ private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readConfigurationData(
+ final YangInstanceIdentifier path) {
final ListenableFuture<RpcResult<CompositeNode>> future = rpc.invokeRpc(NETCONF_GET_CONFIG_QNAME,
NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path)));
- return Futures.transform(future, new Function<RpcResult<CompositeNode>, Optional<NormalizedNode<?, ?>>>() {
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> transformedFuture = Futures.transform(future, new Function<RpcResult<CompositeNode>, Optional<NormalizedNode<?, ?>>>() {
@Override
public Optional<NormalizedNode<?, ?>> apply(final RpcResult<CompositeNode> result) {
checkReadSuccess(result, path);
transform(path, node);
}
});
+
+ return MappingCheckedFuture.create(transformedFuture, ReadFailedException.MAPPER);
}
private void checkReadSuccess(final RpcResult<CompositeNode> result, final YangInstanceIdentifier path) {
}
}
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> readOperationalData(final YangInstanceIdentifier path) {
+ private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readOperationalData(
+ final YangInstanceIdentifier path) {
final ListenableFuture<RpcResult<CompositeNode>> future = rpc.invokeRpc(NETCONF_GET_QNAME, NetconfMessageTransformUtil.wrap(NETCONF_GET_QNAME, toFilterStructure(path)));
- return Futures.transform(future, new Function<RpcResult<CompositeNode>, Optional<NormalizedNode<?, ?>>>() {
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> transformedFuture = Futures.transform(future, new Function<RpcResult<CompositeNode>, Optional<NormalizedNode<?, ?>>>() {
@Override
public Optional<NormalizedNode<?, ?>> apply(final RpcResult<CompositeNode> result) {
checkReadSuccess(result, path);
transform(path, node);
}
});
+
+ return MappingCheckedFuture.create(transformedFuture, ReadFailedException.MAPPER);
}
private static Node<?> findNode(final CompositeNode node, final YangInstanceIdentifier identifier) {
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
+ final LogicalDatastoreType store, final YangInstanceIdentifier path) {
final YangInstanceIdentifier legacyPath = toLegacyPath(normalizer, path, id);
switch (store) {
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
+
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
+ final LogicalDatastoreType store, final YangInstanceIdentifier path) {
return delegateReadTx.read(store, path);
}