From: tpantelis Date: Mon, 14 Jul 2014 16:35:57 +0000 (-0400) Subject: Bug 1392: Change ReadTransaction#read to return CheckedFuture X-Git-Tag: release/helium~356^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=10948fbda7e6d997525cce5b4929a1e426045c52 Bug 1392: Change ReadTransaction#read to return CheckedFuture Added a ReadFailedException that is used with the CheckedFuture. Moved the RpcError list from TransactionCommitFailedException to a new base OperationFailedException in yangtools and also derived ReadFailedException from OperationFailedException. Added a static MAPPER in ReadFailedException class using the new generalized base ExceptionMapper class in yangtools. Also derived TransactionCommitFailedExceptionMapper from ExceptionMapper. Modified uses of Futures#makeChecked in the read Tx and write Tx submit to use the new MappingCheckedFuture class in yangtools (see https://git.opendaylight.org/gerrit/#/c/9240/ for details). Change-Id: I5c4f717f0b8664b7d39c1e6f0366525f04e6634d Signed-off-by: tpantelis --- diff --git a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/ReadTransaction.java b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/ReadTransaction.java index cc85d4337b..b0c93734e0 100644 --- a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/ReadTransaction.java +++ b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/ReadTransaction.java @@ -9,11 +9,12 @@ package org.opendaylight.controller.md.sal.binding.api; import org.opendaylight.controller.md.sal.common.api.data.AsyncReadTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import com.google.common.base.Optional; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.CheckedFuture; /** * A transaction that provides read access to a logical data store. @@ -33,14 +34,17 @@ public interface ReadTransaction extends AsyncReadTransaction - *
  • If data at supplied path exists the - * {@link ListeblaFuture#get()} returns Optional object containing - * data once read is done. - *
  • If data at supplied path does not exists the - * {@link ListenbleFuture#get()} returns {@link Optional#absent()}. + *
  • If the data at the supplied path exists, the Future returns an Optional object + * containing the data.
  • + *
  • If the data at the supplied path does not exist, the Future returns + * Optional#absent().
  • + *
  • If the read of the data fails, the Future will fail with a + * {@link ReadFailedException} or an exception derived from ReadFailedException.
  • * */ - ListenableFuture> read(LogicalDatastoreType store, InstanceIdentifier path); + CheckedFuture,ReadFailedException> read( + LogicalDatastoreType store, InstanceIdentifier path); } diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedTransaction.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedTransaction.java index e52fcdce23..96a3f1cc3b 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedTransaction.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedTransaction.java @@ -9,17 +9,19 @@ package org.opendaylight.controller.md.sal.binding.impl; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; import org.opendaylight.yangtools.concepts.Delegator; import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; abstract class AbstractForwardedTransaction>> @@ -54,8 +56,13 @@ abstract class AbstractForwardedTransaction ListenableFuture> doRead(final DOMDataReadTransaction readTx, - final LogicalDatastoreType store, final org.opendaylight.yangtools.yang.binding.InstanceIdentifier path) { - return Futures.transform(readTx.read(store, codec.toNormalized(path)), codec.deserializeFunction(path)); + protected final CheckedFuture,ReadFailedException> doRead( + final DOMDataReadTransaction readTx, final LogicalDatastoreType store, + final org.opendaylight.yangtools.yang.binding.InstanceIdentifier path) { + + return MappingCheckedFuture.create( + Futures.transform(readTx.read(store, codec.toNormalized(path)), + codec.deserializeFunction(path)), + ReadFailedException.MAPPER); } } diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataReadTransactionImpl.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataReadTransactionImpl.java index bb942047f2..fd0945f1a6 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataReadTransactionImpl.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataReadTransactionImpl.java @@ -9,12 +9,13 @@ package org.opendaylight.controller.md.sal.binding.impl; import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import com.google.common.base.Optional; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.CheckedFuture; class BindingDataReadTransactionImpl extends AbstractForwardedTransaction implements ReadOnlyTransaction { @@ -25,8 +26,8 @@ class BindingDataReadTransactionImpl extends AbstractForwardedTransaction ListenableFuture> read(final LogicalDatastoreType store, - final InstanceIdentifier path) { + public CheckedFuture,ReadFailedException> read( + final LogicalDatastoreType store, final InstanceIdentifier path) { return doRead(getDelegate(),store, path); } diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataReadWriteTransactionImpl.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataReadWriteTransactionImpl.java index c8b9d9347a..a1da029c24 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataReadWriteTransactionImpl.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataReadWriteTransactionImpl.java @@ -9,12 +9,13 @@ package org.opendaylight.controller.md.sal.binding.impl; import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import com.google.common.base.Optional; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.CheckedFuture; class BindingDataReadWriteTransactionImpl extends BindingDataWriteTransactionImpl implements ReadWriteTransaction { @@ -25,8 +26,8 @@ class BindingDataReadWriteTransactionImpl extends } @Override - public ListenableFuture> read(final LogicalDatastoreType store, - final InstanceIdentifier path) { + public CheckedFuture,ReadFailedException> read( + final LogicalDatastoreType store, final InstanceIdentifier path) { return doRead(getDelegate(), store, path); } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-common-api/pom.xml b/opendaylight/md-sal/sal-common-api/pom.xml index 3449c5488e..e46fe1fe78 100644 --- a/opendaylight/md-sal/sal-common-api/pom.xml +++ b/opendaylight/md-sal/sal-common-api/pom.xml @@ -19,6 +19,10 @@ org.opendaylight.yangtools concepts + + org.opendaylight.yangtools + util + org.opendaylight.yangtools yang-common diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/ReadFailedException.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/ReadFailedException.java new file mode 100644 index 0000000000..b0a7807b76 --- /dev/null +++ b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/ReadFailedException.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.md.sal.common.api.data; + +import org.opendaylight.yangtools.util.concurrent.ExceptionMapper; +import org.opendaylight.yangtools.yang.common.OperationFailedException; +import org.opendaylight.yangtools.yang.common.RpcError; + +/** + * An exception for a failed read. + */ +public class ReadFailedException extends OperationFailedException { + + private static final long serialVersionUID = 1L; + + public static final ExceptionMapper MAPPER = + new ExceptionMapper("read", ReadFailedException.class) { + @Override + protected ReadFailedException newWithCause(String message, Throwable cause) { + return new ReadFailedException(message, cause); + } + }; + + public ReadFailedException(String message, RpcError... errors) { + super(message, errors); + } + + public ReadFailedException(String message, Throwable cause, RpcError... errors) { + super(message, cause, errors); + } +} diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/TransactionCommitFailedException.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/TransactionCommitFailedException.java index 18a857e1d5..7ac76e47b1 100644 --- a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/TransactionCommitFailedException.java +++ b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/TransactionCommitFailedException.java @@ -7,14 +7,8 @@ */ package org.opendaylight.controller.md.sal.common.api.data; -import java.util.Arrays; -import java.util.List; - +import org.opendaylight.yangtools.yang.common.OperationFailedException; import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; - -import com.google.common.collect.ImmutableList; /** * @@ -24,41 +18,16 @@ import com.google.common.collect.ImmutableList; * failed. * */ -public class TransactionCommitFailedException extends Exception { +public class TransactionCommitFailedException extends OperationFailedException { private static final long serialVersionUID = 1L; - private final List errorList; - public TransactionCommitFailedException(final String message, final RpcError... errors) { this(message, null, errors); } public TransactionCommitFailedException(final String message, final Throwable cause, final RpcError... errors) { - super(message, cause); - - if( errors != null && errors.length > 0 ) { - errorList = ImmutableList.builder().addAll( Arrays.asList( errors ) ).build(); - } - else { - // Add a default RpcError. - errorList = ImmutableList.of(RpcResultBuilder.newError(ErrorType.APPLICATION, null, - getMessage(), null, null, getCause())); - } - } - - /** - * Returns additional error information about this exception. - * - * @return a List of RpcErrors. There is always at least one RpcError. - */ - public List getErrorList() { - return errorList; - } - - @Override - public String getMessage() { - return new StringBuilder( super.getMessage() ).append(", errors: ").append( errorList ).toString(); + super(message, cause, errors); } } diff --git a/opendaylight/md-sal/sal-common-util/pom.xml b/opendaylight/md-sal/sal-common-util/pom.xml index 9108f8603b..e42c86a993 100644 --- a/opendaylight/md-sal/sal-common-util/pom.xml +++ b/opendaylight/md-sal/sal-common-util/pom.xml @@ -15,6 +15,11 @@ com.google.guava guava + + junit + junit + test + org.opendaylight.controller sal-common-api diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 2ef8e5f449..d21ea51c2a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -8,8 +8,11 @@ package org.opendaylight.controller.cluster.datastore; +import java.util.concurrent.Executors; + import akka.actor.ActorRef; import akka.actor.ActorSystem; + import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; @@ -30,8 +33,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; /** * @@ -41,6 +44,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); + private static final int DEFAULT_EXECUTOR_POOL_SIZE = 10; private final String type; private final ActorContext actorContext; @@ -55,10 +59,10 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au * This is typically used when we need to make a request to an actor and * wait for it's response and the consumer needs to be provided a Future. * - * FIXME : Make the thread pool configurable + * FIXME : Make the thread pool size configurable. */ - private final ExecutorService executor = - Executors.newFixedThreadPool(10); + private final ListeningExecutorService executor = + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_EXECUTOR_POOL_SIZE)); public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, Configuration configuration) { this(new ActorContext(actorSystem, actorSystem diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 999d0f8baf..a7089a7f75 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -15,9 +15,11 @@ import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; import akka.serialization.Serialization; + import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; + import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -74,7 +76,7 @@ public class Shard extends RaftActor { private final String name; - private SchemaContext schemaContext; + private volatile SchemaContext schemaContext; private final ShardStats shardMBean; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index b56dc9432f..56220656ad 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -10,8 +10,10 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorPath; import akka.actor.ActorSelection; + import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.ListeningExecutorService; + import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; @@ -29,7 +31,6 @@ import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; /** * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies @@ -42,14 +43,14 @@ public class ThreePhaseCommitCohortProxy implements private final ActorContext actorContext; private final List cohortPaths; - private final ExecutorService executor; + private final ListeningExecutorService executor; private final String transactionId; public ThreePhaseCommitCohortProxy(ActorContext actorContext, List cohortPaths, String transactionId, - ExecutorService executor) { + ListeningExecutorService executor) { this.actorContext = actorContext; this.cohortPaths = cohortPaths; @@ -58,42 +59,38 @@ public class ThreePhaseCommitCohortProxy implements } @Override public ListenableFuture canCommit() { - Callable call = new Callable() { - - @Override public Boolean call() throws Exception { - for(ActorPath actorPath : cohortPaths){ - ActorSelection cohort = actorContext.actorSelection(actorPath); - - try { - Object response = - actorContext.executeRemoteOperation(cohort, - new CanCommitTransaction().toSerializable(), - ActorContext.ASK_DURATION); - - if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) { - CanCommitTransactionReply reply = - CanCommitTransactionReply.fromSerializable(response); - if (!reply.getCanCommit()) { - return false; + Callable call = new Callable() { + + @Override + public Boolean call() throws Exception { + for(ActorPath actorPath : cohortPaths){ + ActorSelection cohort = actorContext.actorSelection(actorPath); + + try { + Object response = + actorContext.executeRemoteOperation(cohort, + new CanCommitTransaction().toSerializable(), + ActorContext.ASK_DURATION); + + if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) { + CanCommitTransactionReply reply = + CanCommitTransactionReply.fromSerializable(response); + if (!reply.getCanCommit()) { + System.out.println("**TOM - failed: false"); + return false; + } } + } catch(RuntimeException e){ + LOG.error("Unexpected Exception", e); + return false; } - } catch(RuntimeException e){ - LOG.error("Unexpected Exception", e); - return false; } - - } - return true; + return true; } }; - ListenableFutureTask - future = ListenableFutureTask.create(call); - - executor.submit(future); - - return future; + return executor.submit(call); } @Override public ListenableFuture preCommit() { @@ -138,13 +135,7 @@ public class ThreePhaseCommitCohortProxy implements } }; - ListenableFutureTask - future = ListenableFutureTask.create(call); - - executor.submit(future); - - return future; - + return executor.submit(call); } public List getCohortPaths() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 2e8538d077..5e9defa5b5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -15,17 +15,18 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import java.util.concurrent.ExecutorService; +import com.google.common.util.concurrent.ListeningExecutorService; /** * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard */ public class TransactionChainProxy implements DOMStoreTransactionChain{ private final ActorContext actorContext; - private final ExecutorService transactionExecutor; + private final ListeningExecutorService transactionExecutor; private final SchemaContext schemaContext; - public TransactionChainProxy(ActorContext actorContext, ExecutorService transactionExecutor, SchemaContext schemaContext) { + public TransactionChainProxy(ActorContext actorContext, ListeningExecutorService transactionExecutor, + SchemaContext schemaContext) { this.actorContext = actorContext; this.transactionExecutor = transactionExecutor; this.schemaContext = schemaContext; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index c85d32012f..cbd61b2087 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -12,10 +12,12 @@ import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Props; + import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.ListeningExecutorService; + import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; @@ -29,8 +31,10 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -42,7 +46,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; /** @@ -74,13 +77,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final ActorContext actorContext; private final Map remoteTransactionPaths = new HashMap<>(); private final String identifier; - private final ExecutorService executor; + private final ListeningExecutorService executor; private final SchemaContext schemaContext; public TransactionProxy( ActorContext actorContext, TransactionType transactionType, - ExecutorService executor, + ListeningExecutorService executor, SchemaContext schemaContext ) { @@ -94,7 +97,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } @Override - public ListenableFuture>> read(final YangInstanceIdentifier path) { + public CheckedFuture>, ReadFailedException> read( + final YangInstanceIdentifier path) { createTransactionIfMissing(actorContext, path); @@ -197,6 +201,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { remoteTransactionPaths.put(shardName, transactionContext); } } catch(TimeoutException e){ + LOG.warn("Timed out trying to create transaction on shard {}: {}", shardName, e); remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName)); } } @@ -214,7 +219,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { void mergeData(YangInstanceIdentifier path, NormalizedNode data); - ListenableFuture>> readData(final YangInstanceIdentifier path); + CheckedFuture>, ReadFailedException> readData( + final YangInstanceIdentifier path); void writeData(YangInstanceIdentifier path, NormalizedNode data); } @@ -266,9 +272,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { getActor().tell(new MergeData(path, data, schemaContext).toSerializable(), null); } - @Override public ListenableFuture>> readData(final YangInstanceIdentifier path) { + @Override public CheckedFuture>, ReadFailedException> readData( + final YangInstanceIdentifier path) { - Callable>> call = new Callable() { + Callable>> call = new Callable>>() { @Override public Optional> call() throws Exception { Object response = actorContext @@ -279,20 +286,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(reply.getNormalizedNode() == null){ return Optional.absent(); } - //FIXME : A cast should not be required here ??? - return (Optional>) Optional.of(reply.getNormalizedNode()); + return Optional.>of(reply.getNormalizedNode()); } return Optional.absent(); } }; - ListenableFutureTask>> - future = ListenableFutureTask.create(call); - - executor.submit(future); - - return future; + return MappingCheckedFuture.create(executor.submit(call), ReadFailedException.MAPPER); } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { @@ -342,10 +343,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } @Override - public ListenableFuture>> readData( + public CheckedFuture>, ReadFailedException> readData( YangInstanceIdentifier path) { LOG.error("readData called path = {}", path); - return Futures.immediateFuture( + return Futures.immediateCheckedFuture( Optional.>absent()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index b5e3d24ef6..5f4ac57da0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -2,8 +2,10 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; + import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -19,6 +21,8 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertTrue; @@ -52,7 +56,9 @@ public class DistributedDataStoreIntegrationTest{ distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext()); - Thread.sleep(1000); + // This sleep is fragile - test can fail intermittently if all Shards aren't updated with + // the SchemaContext in time. Is there any way we can make this deterministic? + Thread.sleep(2000); DOMStoreReadWriteTransaction transaction = distributedDataStore.newReadWriteTransaction(); @@ -72,22 +78,21 @@ public class DistributedDataStoreIntegrationTest{ ListenableFuture canCommit = ready.canCommit(); - assertTrue(canCommit.get()); + assertTrue(canCommit.get(5, TimeUnit.SECONDS)); ListenableFuture preCommit = ready.preCommit(); - preCommit.get(); + preCommit.get(5, TimeUnit.SECONDS); ListenableFuture commit = ready.commit(); - commit.get(); - + commit.get(5, TimeUnit.SECONDS); } @Test public void integrationTestWithMultiShardConfiguration() - throws ExecutionException, InterruptedException { + throws ExecutionException, InterruptedException, TimeoutException { Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf"); ShardStrategyFactory.setConfiguration(configuration); @@ -97,7 +102,9 @@ public class DistributedDataStoreIntegrationTest{ distributedDataStore.onGlobalContextUpdated(SchemaContextHelper.full()); - Thread.sleep(1000); + // This sleep is fragile - test can fail intermittently if all Shards aren't updated with + // the SchemaContext in time. Is there any way we can make this deterministic? + Thread.sleep(2000); DOMStoreReadWriteTransaction transaction = distributedDataStore.newReadWriteTransaction(); @@ -109,16 +116,15 @@ public class DistributedDataStoreIntegrationTest{ ListenableFuture canCommit = ready.canCommit(); - assertTrue(canCommit.get()); + assertTrue(canCommit.get(5, TimeUnit.SECONDS)); ListenableFuture preCommit = ready.preCommit(); - preCommit.get(); + preCommit.get(5, TimeUnit.SECONDS); ListenableFuture commit = ready.commit(); - commit.get(); - + commit.get(5, TimeUnit.SECONDS); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index 992518e100..4eca5671f6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -2,8 +2,14 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Props; + import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + import junit.framework.Assert; + +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; @@ -14,7 +20,6 @@ import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; import java.util.Arrays; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static org.junit.Assert.assertNotNull; @@ -25,7 +30,8 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { private Props props; private ActorRef actorRef; private MockActorContext actorContext; - private ExecutorService executor = Executors.newSingleThreadExecutor(); + private final ListeningExecutorService executor = MoreExecutors.listeningDecorator( + Executors.newSingleThreadExecutor()); @Before public void setUp(){ @@ -39,6 +45,11 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { } + @After + public void tearDown() { + executor.shutdownNow(); + } + @Test public void testCanCommit() throws Exception { actorContext.setExecuteRemoteOperationResponse(new CanCommitTransactionReply(true).toSerializable()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index f654e3aced..7d9d2dad81 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -2,9 +2,15 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Props; + import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + import junit.framework.Assert; + +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; @@ -28,7 +34,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import java.util.List; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TransactionProxyTest extends AbstractActorTest { @@ -38,14 +43,19 @@ public class TransactionProxyTest extends AbstractActorTest { private final ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration ); - private ExecutorService transactionExecutor = - Executors.newSingleThreadExecutor(); + private final ListeningExecutorService transactionExecutor = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); @Before public void setUp(){ ShardStrategyFactory.setConfiguration(configuration); } + @After + public void tearDown() { + transactionExecutor.shutdownNow(); + } + @Test public void testRead() throws Exception { final Props props = Props.create(DoNothingActor.class); diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataReadTransaction.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataReadTransaction.java index afa2286d53..fc251c8445 100644 --- a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataReadTransaction.java +++ b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataReadTransaction.java @@ -9,11 +9,12 @@ package org.opendaylight.controller.md.sal.dom.api; import org.opendaylight.controller.md.sal.common.api.data.AsyncReadTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import com.google.common.base.Optional; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.CheckedFuture; /** * A transaction that provides read access to a logical data store. @@ -33,14 +34,17 @@ public interface DOMDataReadTransaction extends AsyncReadTransaction - *
  • If data at supplied path exists the - * {@link ListeblaFuture#get()} returns Optional object containing - * data once read is done. - *
  • If data at supplied path does not exists the - * {@link ListenbleFuture#get()} returns {@link Optional#absent()}. + *
  • If the data at the supplied path exists, the Future returns an Optional object + * containing the data.
  • + *
  • If the data at the supplied path does not exist, the Future returns + * Optional#absent().
  • + *
  • If the read of the data fails, the Future will fail with a + * {@link ReadFailedException} or an exception derived from ReadFailedException.
  • * */ - ListenableFuture>> read(LogicalDatastoreType store,YangInstanceIdentifier path); + CheckedFuture>, ReadFailedException> read( + LogicalDatastoreType store, YangInstanceIdentifier path); } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java index 8b9eb445fd..9a6d12fb18 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java @@ -15,6 +15,7 @@ import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,7 +92,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get())); } - return Futures.makeChecked(commitFuture, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); + return MappingCheckedFuture.create(commitFuture, + TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); } /** @@ -285,7 +287,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { */ @SuppressWarnings({ "unchecked", "rawtypes" }) ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops.build()); - return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER); + return MappingCheckedFuture.create(compositeResult, + TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER); } /** @@ -316,7 +319,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { */ @SuppressWarnings({ "unchecked", "rawtypes" }) ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops.build()); - return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); + return MappingCheckedFuture.create(compositeResult, + TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); } /** @@ -342,8 +346,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { } ListenableFuture> allCanCommits = Futures.allAsList(canCommitOperations.build()); ListenableFuture allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION); - return Futures - .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER); + return MappingCheckedFuture.create(allSuccessFuture, + TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER); } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadOnlyTransaction.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadOnlyTransaction.java index c8edcbc6e2..b4562cf2ec 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadOnlyTransaction.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadOnlyTransaction.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.md.sal.dom.broker.impl; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -15,7 +16,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.CheckedFuture; /** * @@ -34,8 +35,8 @@ class DOMForwardedReadOnlyTransaction extends } @Override - public ListenableFuture>> read(final LogicalDatastoreType store, - final YangInstanceIdentifier path) { + public CheckedFuture>, ReadFailedException> read( + final LogicalDatastoreType store, final YangInstanceIdentifier path) { return getSubtransaction(store).read(path); } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadWriteTransaction.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadWriteTransaction.java index e6521b2377..74a4c52e36 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadWriteTransaction.java @@ -7,6 +7,7 @@ */package org.opendaylight.controller.md.sal.dom.broker.impl; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -14,7 +15,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.CheckedFuture; /** * @@ -45,8 +46,8 @@ class DOMForwardedReadWriteTransaction extends DOMForwardedWriteTransaction>> read(final LogicalDatastoreType store, - final YangInstanceIdentifier path) { + public CheckedFuture>, ReadFailedException> read( + final LogicalDatastoreType store, final YangInstanceIdentifier path) { return getSubtransaction(store).read(path); } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/TransactionCommitFailedExceptionMapper.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/TransactionCommitFailedExceptionMapper.java index 258b068929..799a8a09ed 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/TransactionCommitFailedExceptionMapper.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/TransactionCommitFailedExceptionMapper.java @@ -7,29 +7,16 @@ */ package org.opendaylight.controller.md.sal.dom.broker.impl; -import java.util.concurrent.ExecutionException; - import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; +import org.opendaylight.yangtools.util.concurrent.ExceptionMapper; /** + * Utility exception mapper which translates Exception to {@link TransactionCommitFailedException}. * - * Utility exception mapper which translates {@link Exception} - * to {@link TransactionCommitFailedException}. - * - * This mapper is intended to be used with {@link com.google.common.util.concurrent.Futures#makeChecked(com.google.common.util.concurrent.ListenableFuture, Function)} - *
      - *
    • if exception is {@link TransactionCommitFailedException} or one of its subclasses returns original exception. - *
    • if exception is {@link ExecutionException} and cause is {@link TransactionCommitFailedException} return cause - *
    • otherwise returns {@link TransactionCommitFailedException} with original exception as a cause. - *
    - * + * @see ExceptionMapper */ - -final class TransactionCommitFailedExceptionMapper implements - Function { +final class TransactionCommitFailedExceptionMapper + extends ExceptionMapper { static final TransactionCommitFailedExceptionMapper PRE_COMMIT_MAPPER = create("canCommit"); @@ -37,10 +24,8 @@ final class TransactionCommitFailedExceptionMapper implements static final TransactionCommitFailedExceptionMapper COMMIT_ERROR_MAPPER = create("commit"); - private final String opName; - private TransactionCommitFailedExceptionMapper(final String opName) { - this.opName = Preconditions.checkNotNull(opName); + super( opName, TransactionCommitFailedException.class ); } public static final TransactionCommitFailedExceptionMapper create(final String opName) { @@ -48,22 +33,7 @@ final class TransactionCommitFailedExceptionMapper implements } @Override - public TransactionCommitFailedException apply(final Exception e) { - // If excetion is TransactionCommitFailedException - // we reuse it directly. - if (e instanceof TransactionCommitFailedException) { - return (TransactionCommitFailedException) e; - } - // If error is ExecutionException which was caused by cause of - // TransactionCommitFailedException - // we reuse original cause - if (e instanceof ExecutionException && e.getCause() instanceof TransactionCommitFailedException) { - return (TransactionCommitFailedException) e.getCause(); - } - if (e instanceof InterruptedException) { - return new TransactionCommitFailedException(opName + " failed - DOMStore was interupted.", e); - } - // Otherwise we are using new exception, with original cause - return new TransactionCommitFailedException(opName + " failed", e); + protected TransactionCommitFailedException newWithCause( String message, Throwable cause ) { + return new TransactionCommitFailedException( message, cause ); } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BackwardsCompatibleMountPoint.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BackwardsCompatibleMountPoint.java index 61ea47e39b..5bd8a7bc02 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BackwardsCompatibleMountPoint.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BackwardsCompatibleMountPoint.java @@ -15,16 +15,20 @@ import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; + import java.util.List; import java.util.Map; import java.util.Set; + import javax.annotation.Nullable; + import org.opendaylight.controller.md.sal.common.api.RegistrationListener; import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration; import org.opendaylight.controller.md.sal.common.api.data.DataReader; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener; @@ -380,7 +384,8 @@ public class BackwardsCompatibleMountPoint implements MountProvisionInstance, Sc } @Override - public ListenableFuture>> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) { + public CheckedFuture>, ReadFailedException> read( + final LogicalDatastoreType store, final YangInstanceIdentifier path) { CompositeNode rawData = null; @@ -398,7 +403,7 @@ public class BackwardsCompatibleMountPoint implements MountProvisionInstance, Sc final Map.Entry> normalized = normalizer.toNormalized(path, rawData); final Optional> normalizedNodeOptional = Optional.>fromNullable(normalized.getValue()); - return com.google.common.util.concurrent.Futures.immediateFuture(normalizedNodeOptional); + return Futures.immediateCheckedFuture(normalizedNodeOptional); } } @@ -508,7 +513,8 @@ public class BackwardsCompatibleMountPoint implements MountProvisionInstance, Sc } @Override - public ListenableFuture>> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) { + public CheckedFuture>, ReadFailedException> read( + final LogicalDatastoreType store, final YangInstanceIdentifier path) { return new BackwardsCompatibleReadTransaction(dataReader, dataNormalizer).read(store, path); } diff --git a/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/DOMStoreReadTransaction.java b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/DOMStoreReadTransaction.java index ae1b3ee2aa..84d09c7cb0 100644 --- a/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/DOMStoreReadTransaction.java +++ b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/DOMStoreReadTransaction.java @@ -7,29 +7,31 @@ */ package org.opendaylight.controller.sal.core.spi.data; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import com.google.common.base.Optional; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.CheckedFuture; public interface DOMStoreReadTransaction extends DOMStoreTransaction { /** - * * Reads data from provided logical data store located at provided path * - * * @param path * Path which uniquely identifies subtree which client want to * read - * @return Listenable Future which contains read result + * @return a CheckFuture containing the result of the read. The Future blocks until the + * commit operation is complete. Once complete: *
      - *
    • If data at supplied path exists the {@link java.util.concurrent.Future#get()} - * returns Optional object containing data - *
    • If data at supplied path does not exists the - * {@link java.util.concurrent.Future#get()} returns {@link Optional#absent()}. + *
    • If the data at the supplied path exists, the Future returns an Optional object + * containing the data.
    • + *
    • If the data at the supplied path does not exist, the Future returns + * Optional#absent().
    • + *
    • If the read of the data fails, the Future will fail with a + * {@link ReadFailedException} or an exception derived from ReadFailedException.
    • *
    */ - ListenableFuture>> read(YangInstanceIdentifier path); + CheckedFuture>, ReadFailedException> read(YangInstanceIdentifier path); } diff --git a/opendaylight/md-sal/sal-inmemory-datastore/pom.xml b/opendaylight/md-sal/sal-inmemory-datastore/pom.xml index 906847426c..725b24cd9e 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/pom.xml +++ b/opendaylight/md-sal/sal-inmemory-datastore/pom.xml @@ -112,12 +112,16 @@ binding-generator-impl test
    + + org.opendaylight.yangtools + mockito-configuration + test + org.opendaylight.controller sal-test-model test - ${project.version} - + diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadTransaction.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadTransaction.java index 39d6483c52..2a98406343 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadTransaction.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadTransaction.java @@ -8,9 +8,8 @@ package org.opendaylight.controller.md.sal.dom.store.impl; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -19,8 +18,8 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; /** * @@ -30,8 +29,9 @@ import com.google.common.util.concurrent.ListenableFuture; * which delegates most of its calls to similar methods provided by underlying snapshot. * */ -final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction implements -DOMStoreReadTransaction { +final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction + implements DOMStoreReadTransaction { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadTransaction.class); private DataTreeSnapshot stableSnapshot; @@ -48,9 +48,19 @@ DOMStoreReadTransaction { } @Override - public ListenableFuture>> read(final YangInstanceIdentifier path) { + public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { + LOG.debug("Tx: {} Read: {}", getIdentifier(), path); checkNotNull(path, "Path must not be null."); - checkState(stableSnapshot != null, "Transaction is closed"); - return Futures.immediateFuture(stableSnapshot.readNode(path)); + + if(stableSnapshot == null) { + return Futures.immediateFailedCheckedFuture(new ReadFailedException("Transaction is closed")); + } + + try { + return Futures.immediateCheckedFuture(stableSnapshot.readNode(path)); + } catch (Exception e) { + LOG.error("Tx: {} Failed Read of {}", getIdentifier(), path, e); + return Futures.immediateFailedCheckedFuture(new ReadFailedException("Read failed",e)); + } } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadWriteTransaction.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadWriteTransaction.java index ec17d7a3f7..5c5e9c6b6d 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadWriteTransaction.java @@ -7,7 +7,11 @@ */ package org.opendaylight.controller.md.sal.dom.store.impl; +import static com.google.common.base.Preconditions.checkNotNull; + +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -15,16 +19,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; /** * Implementation of Read-Write transaction which is backed by {@link DataTreeSnapshot} * and executed according to {@link TransactionReadyPrototype}. * */ -class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction implements -DOMStoreReadWriteTransaction { +class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction + implements DOMStoreReadWriteTransaction { private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadWriteTransaction.class); @@ -41,13 +45,20 @@ DOMStoreReadWriteTransaction { } @Override - public ListenableFuture>> read(final YangInstanceIdentifier path) { + public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { LOG.debug("Tx: {} Read: {}", getIdentifier(), path); + checkNotNull(path, "Path must not be null."); + + DataTreeModification dataView = getMutatedView(); + if(dataView == null) { + return Futures.immediateFailedCheckedFuture(new ReadFailedException("Transaction is closed")); + } + try { - return Futures.immediateFuture(getMutatedView().readNode(path)); + return Futures.immediateCheckedFuture(dataView.readNode(path)); } catch (Exception e) { LOG.error("Tx: {} Failed Read of {}", getIdentifier(), path, e); - throw e; + return Futures.immediateFailedCheckedFuture(new ReadFailedException("Read failed",e)); } } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java index 96369dea5f..9b105aa306 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.md.sal.dom.store.impl; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -16,12 +17,22 @@ import java.util.concurrent.ExecutionException; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mockito; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import com.google.common.base.Optional; @@ -31,226 +42,373 @@ import com.google.common.util.concurrent.MoreExecutors; public class InMemoryDataStoreTest { - private SchemaContext schemaContext; - private InMemoryDOMDataStore domStore; + private SchemaContext schemaContext; + private InMemoryDOMDataStore domStore; - @Before - public void setupStore() { - domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor()); - schemaContext = TestModel.createTestContext(); - domStore.onGlobalContextUpdated(schemaContext); + @Before + public void setupStore() { + domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor()); + schemaContext = TestModel.createTestContext(); + domStore.onGlobalContextUpdated(schemaContext); + } - } + @Test + public void testTransactionIsolation() throws InterruptedException, ExecutionException { - @Test - public void testTransactionIsolation() throws InterruptedException, ExecutionException { + assertNotNull(domStore); - assertNotNull(domStore); + DOMStoreReadTransaction readTx = domStore.newReadOnlyTransaction(); + assertNotNull(readTx); - DOMStoreReadTransaction readTx = domStore.newReadOnlyTransaction(); - assertNotNull(readTx); + DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction(); + assertNotNull(writeTx); - DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction(); - assertNotNull(writeTx); - /** - * - * Writes /test in writeTx - * - */ - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + /** + * Writes /test in writeTx + */ + NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + writeTx.write(TestModel.TEST_PATH, testNode); - /** - * - * Reads /test from writeTx Read should return container. - * - */ - ListenableFuture>> writeTxContainer = writeTx.read(TestModel.TEST_PATH); - assertTrue(writeTxContainer.get().isPresent()); + /** + * Reads /test from writeTx Read should return container. + */ + ListenableFuture>> writeTxContainer = writeTx.read(TestModel.TEST_PATH); + assertEquals("read: isPresent", true, writeTxContainer.get().isPresent()); + assertEquals("read: data", testNode, writeTxContainer.get().get()); - /** - * - * Reads /test from readTx Read should return Absent. - * - */ - ListenableFuture>> readTxContainer = readTx.read(TestModel.TEST_PATH); - assertFalse(readTxContainer.get().isPresent()); - } + /** + * Reads /test from readTx Read should return Absent. + */ + ListenableFuture>> readTxContainer = readTx.read(TestModel.TEST_PATH); + assertEquals("read: isPresent", false, readTxContainer.get().isPresent()); + } - @Test - public void testTransactionCommit() throws InterruptedException, ExecutionException { + @Test + public void testTransactionCommit() throws InterruptedException, ExecutionException { - DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction(); - assertNotNull(writeTx); - /** - * - * Writes /test in writeTx - * - */ - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction(); + assertNotNull(writeTx); - /** - * - * Reads /test from writeTx Read should return container. - * - */ - ListenableFuture>> writeTxContainer = writeTx.read(TestModel.TEST_PATH); - assertTrue(writeTxContainer.get().isPresent()); + /** + * Writes /test in writeTx + */ + NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + writeTx.write(TestModel.TEST_PATH, testNode); - DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + /** + * Reads /test from writeTx Read should return container. + */ + ListenableFuture>> writeTxContainer = writeTx.read(TestModel.TEST_PATH); + assertEquals("read: isPresent", true, writeTxContainer.get().isPresent()); + assertEquals("read: data", testNode, writeTxContainer.get().get()); - assertThreePhaseCommit(cohort); + DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); - Optional> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH) - .get(); - assertTrue(afterCommitRead.isPresent()); - } + assertThreePhaseCommit(cohort); - @Test - public void testTransactionAbort() throws InterruptedException, ExecutionException { + Optional> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH) + .get(); + assertEquals("After commit read: isPresent", true, afterCommitRead.isPresent()); + assertEquals("After commit read: data", testNode, afterCommitRead.get()); + } - DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction(); - assertNotNull(writeTx); + @Test + public void testDelete() throws Exception { - assertTestContainerWrite(writeTx); + DOMStoreWriteTransaction writeTx = domStore.newWriteOnlyTransaction(); + assertNotNull( writeTx ); - DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + // Write /test and commit - assertTrue(cohort.canCommit().get().booleanValue()); - cohort.preCommit().get(); - cohort.abort().get(); + writeTx.write( TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) ); - Optional> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH) - .get(); - assertFalse(afterCommitRead.isPresent()); - } + assertThreePhaseCommit( writeTx.ready() ); - @Test - public void testTransactionChain() throws InterruptedException, ExecutionException { - DOMStoreTransactionChain txChain = domStore.createTransactionChain(); - assertNotNull(txChain); + Optional> afterCommitRead = domStore.newReadOnlyTransaction(). + read(TestModel.TEST_PATH ).get(); + assertEquals( "After commit read: isPresent", true, afterCommitRead.isPresent() ); - /** - * We alocate new read-write transaction and write /test - * - * - */ - DOMStoreReadWriteTransaction firstTx = txChain.newReadWriteTransaction(); - assertTestContainerWrite(firstTx); + // Delete /test and verify - /** - * First transaction is marked as ready, we are able to allocate chained - * transactions - */ - DOMStoreThreePhaseCommitCohort firstWriteTxCohort = firstTx.ready(); + writeTx = domStore.newWriteOnlyTransaction(); - /** - * We alocate chained transaction - read transaction, note first one is - * still not commited to datastore. - */ - DOMStoreReadTransaction secondReadTx = txChain.newReadOnlyTransaction(); + writeTx.delete( TestModel.TEST_PATH ); - /** - * - * We test if we are able to read data from tx, read should not fail - * since we are using chained transaction. - * - * - */ - assertTestContainerExists(secondReadTx); + assertThreePhaseCommit( writeTx.ready() ); - /** - * - * We alocate next transaction, which is still based on first one, but - * is read-write. - * - */ - DOMStoreReadWriteTransaction thirdDeleteTx = txChain.newReadWriteTransaction(); + afterCommitRead = domStore.newReadOnlyTransaction(). + read(TestModel.TEST_PATH ).get(); + assertEquals( "After commit read: isPresent", false, afterCommitRead.isPresent() ); + } - /** - * We test existence of /test in third transaction container should - * still be visible from first one (which is still uncommmited). - * - * - */ - assertTestContainerExists(thirdDeleteTx); + @Test + public void testMerge() throws Exception { - /** - * We delete node in third transaction - */ - thirdDeleteTx.delete(TestModel.TEST_PATH); + DOMStoreWriteTransaction writeTx = domStore.newWriteOnlyTransaction(); + assertNotNull( writeTx ); - /** - * third transaction is sealed. - */ - DOMStoreThreePhaseCommitCohort thirdDeleteTxCohort = thirdDeleteTx.ready(); + ContainerNode containerNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier( new NodeIdentifier( TestModel.TEST_QNAME ) ) + .addChild( ImmutableNodes.mapNodeBuilder( TestModel.OUTER_LIST_QNAME ) + .addChild( ImmutableNodes.mapEntry( TestModel.OUTER_LIST_QNAME, + TestModel.ID_QNAME, 1 ) ).build() ).build(); - /** - * We commit first transaction - * - */ - assertThreePhaseCommit(firstWriteTxCohort); + writeTx.merge( TestModel.TEST_PATH, containerNode ); - // Alocates store transacion - DOMStoreReadTransaction storeReadTx = domStore.newReadOnlyTransaction(); - /** - * We verify transaction is commited to store, container should exists - * in datastore. - */ - assertTestContainerExists(storeReadTx); - /** - * We commit third transaction - * - */ - assertThreePhaseCommit(thirdDeleteTxCohort); - } + assertThreePhaseCommit( writeTx.ready() ); - @Test - @Ignore - public void testTransactionConflict() throws InterruptedException, ExecutionException { - DOMStoreReadWriteTransaction txOne = domStore.newReadWriteTransaction(); - DOMStoreReadWriteTransaction txTwo = domStore.newReadWriteTransaction(); - assertTestContainerWrite(txOne); - assertTestContainerWrite(txTwo); + Optional> afterCommitRead = domStore.newReadOnlyTransaction(). + read(TestModel.TEST_PATH ).get(); + assertEquals( "After commit read: isPresent", true, afterCommitRead.isPresent() ); + assertEquals( "After commit read: data", containerNode, afterCommitRead.get() ); - /** - * Commits transaction - */ - assertThreePhaseCommit(txOne.ready()); + // Merge a new list entry node - /** - * Asserts that txTwo could not be commited - */ - assertFalse(txTwo.ready().canCommit().get()); - } - - private static void assertThreePhaseCommit(final DOMStoreThreePhaseCommitCohort cohort) - throws InterruptedException, ExecutionException { - assertTrue(cohort.canCommit().get().booleanValue()); - cohort.preCommit().get(); - cohort.commit().get(); - } - - private static Optional> assertTestContainerWrite(final DOMStoreReadWriteTransaction writeTx) - throws InterruptedException, ExecutionException { - /** - * - * Writes /test in writeTx - * - */ - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + writeTx = domStore.newWriteOnlyTransaction(); + assertNotNull( writeTx ); + + containerNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier( new NodeIdentifier( TestModel.TEST_QNAME ) ) + .addChild( ImmutableNodes.mapNodeBuilder( TestModel.OUTER_LIST_QNAME ) + .addChild( ImmutableNodes.mapEntry( TestModel.OUTER_LIST_QNAME, + TestModel.ID_QNAME, 1 ) ) + .addChild( ImmutableNodes.mapEntry( TestModel.OUTER_LIST_QNAME, + TestModel.ID_QNAME, 2 ) ).build() ).build(); + + writeTx.merge( TestModel.TEST_PATH, containerNode ); + + assertThreePhaseCommit( writeTx.ready() ); + + afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH ).get(); + assertEquals( "After commit read: isPresent", true, afterCommitRead.isPresent() ); + assertEquals( "After commit read: data", containerNode, afterCommitRead.get() ); + } + + @Test(expected=ReadFailedException.class) + public void testReadWithReadOnlyTransactionClosed() throws Throwable { + + DOMStoreReadTransaction readTx = domStore.newReadOnlyTransaction(); + assertNotNull( readTx ); + + readTx.close(); + + doReadAndThrowEx( readTx ); + } + + @Test(expected=ReadFailedException.class) + public void testReadWithReadOnlyTransactionFailure() throws Throwable { + + DataTreeSnapshot mockSnapshot = Mockito.mock( DataTreeSnapshot.class ); + Mockito.doThrow( new RuntimeException( "mock ex" ) ).when( mockSnapshot ) + .readNode( Mockito.any( YangInstanceIdentifier.class ) ); + + DOMStoreReadTransaction readTx = new SnapshotBackedReadTransaction( "1", mockSnapshot ); + + doReadAndThrowEx( readTx ); + } - return assertTestContainerExists(writeTx); - } + @Test(expected=ReadFailedException.class) + public void testReadWithReadWriteTransactionClosed() throws Throwable { - /** - * Reads /test from readTx Read should return container. - */ - private static Optional> assertTestContainerExists(final DOMStoreReadTransaction readTx) - throws InterruptedException, ExecutionException { + DOMStoreReadTransaction readTx = domStore.newReadWriteTransaction(); + assertNotNull( readTx ); + + readTx.close(); + + doReadAndThrowEx( readTx ); + } + + @Test(expected=ReadFailedException.class) + public void testReadWithReadWriteTransactionFailure() throws Throwable { + + DataTreeSnapshot mockSnapshot = Mockito.mock( DataTreeSnapshot.class ); + DataTreeModification mockModification = Mockito.mock( DataTreeModification.class ); + Mockito.doThrow( new RuntimeException( "mock ex" ) ).when( mockModification ) + .readNode( Mockito.any( YangInstanceIdentifier.class ) ); + Mockito.doReturn( mockModification ).when( mockSnapshot ).newModification(); + TransactionReadyPrototype mockReady = Mockito.mock( TransactionReadyPrototype.class ); + DOMStoreReadTransaction readTx = new SnapshotBackedReadWriteTransaction( "1", mockSnapshot, mockReady ); + + doReadAndThrowEx( readTx ); + } + + private void doReadAndThrowEx( DOMStoreReadTransaction readTx ) throws Throwable { + + try { + readTx.read(TestModel.TEST_PATH).get(); + } catch( ExecutionException e ) { + throw e.getCause(); + } + } + + @Test(expected=IllegalStateException.class) + public void testWriteWithTransactionReady() throws Exception { + + DOMStoreWriteTransaction writeTx = domStore.newWriteOnlyTransaction(); + + writeTx.ready(); + + // Should throw ex + writeTx.write( TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) ); + } + + @Test(expected=IllegalStateException.class) + public void testReadyWithTransactionAlreadyReady() throws Exception { + + DOMStoreWriteTransaction writeTx = domStore.newWriteOnlyTransaction(); + + writeTx.ready(); + + // Should throw ex + writeTx.ready(); + } + + @Test + public void testTransactionAbort() throws InterruptedException, ExecutionException { + + DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction(); + assertNotNull(writeTx); + + assertTestContainerWrite(writeTx); + + DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + + assertTrue(cohort.canCommit().get().booleanValue()); + cohort.preCommit().get(); + cohort.abort().get(); + + Optional> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH) + .get(); + assertFalse(afterCommitRead.isPresent()); + } + + @Test + public void testTransactionChain() throws InterruptedException, ExecutionException { + DOMStoreTransactionChain txChain = domStore.createTransactionChain(); + assertNotNull(txChain); + + /** + * We alocate new read-write transaction and write /test + * + * + */ + DOMStoreReadWriteTransaction firstTx = txChain.newReadWriteTransaction(); + assertTestContainerWrite(firstTx); + + /** + * First transaction is marked as ready, we are able to allocate chained + * transactions + */ + DOMStoreThreePhaseCommitCohort firstWriteTxCohort = firstTx.ready(); + + /** + * We alocate chained transaction - read transaction, note first one is + * still not commited to datastore. + */ + DOMStoreReadTransaction secondReadTx = txChain.newReadOnlyTransaction(); + + /** + * + * We test if we are able to read data from tx, read should not fail + * since we are using chained transaction. + * + * + */ + assertTestContainerExists(secondReadTx); + + /** + * + * We alocate next transaction, which is still based on first one, but + * is read-write. + * + */ + DOMStoreReadWriteTransaction thirdDeleteTx = txChain.newReadWriteTransaction(); + + /** + * We test existence of /test in third transaction container should + * still be visible from first one (which is still uncommmited). + * + * + */ + assertTestContainerExists(thirdDeleteTx); + + /** + * We delete node in third transaction + */ + thirdDeleteTx.delete(TestModel.TEST_PATH); + + /** + * third transaction is sealed. + */ + DOMStoreThreePhaseCommitCohort thirdDeleteTxCohort = thirdDeleteTx.ready(); + + /** + * We commit first transaction + * + */ + assertThreePhaseCommit(firstWriteTxCohort); + + // Alocates store transacion + DOMStoreReadTransaction storeReadTx = domStore.newReadOnlyTransaction(); + /** + * We verify transaction is commited to store, container should exists + * in datastore. + */ + assertTestContainerExists(storeReadTx); + /** + * We commit third transaction + * + */ + assertThreePhaseCommit(thirdDeleteTxCohort); + } + + @Test + @Ignore + public void testTransactionConflict() throws InterruptedException, ExecutionException { + DOMStoreReadWriteTransaction txOne = domStore.newReadWriteTransaction(); + DOMStoreReadWriteTransaction txTwo = domStore.newReadWriteTransaction(); + assertTestContainerWrite(txOne); + assertTestContainerWrite(txTwo); + + /** + * Commits transaction + */ + assertThreePhaseCommit(txOne.ready()); + + /** + * Asserts that txTwo could not be commited + */ + assertFalse(txTwo.ready().canCommit().get()); + } + + private static void assertThreePhaseCommit(final DOMStoreThreePhaseCommitCohort cohort) + throws InterruptedException, ExecutionException { + assertTrue(cohort.canCommit().get().booleanValue()); + cohort.preCommit().get(); + cohort.commit().get(); + } + + private static Optional> assertTestContainerWrite(final DOMStoreReadWriteTransaction writeTx) + throws InterruptedException, ExecutionException { + /** + * + * Writes /test in writeTx + * + */ + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + return assertTestContainerExists(writeTx); + } + + /** + * Reads /test from readTx Read should return container. + */ + private static Optional> assertTestContainerExists(final DOMStoreReadTransaction readTx) + throws InterruptedException, ExecutionException { - ListenableFuture>> writeTxContainer = readTx.read(TestModel.TEST_PATH); - assertTrue(writeTxContainer.get().isPresent()); - return writeTxContainer.get(); - } + ListenableFuture>> writeTxContainer = readTx.read(TestModel.TEST_PATH); + assertTrue(writeTxContainer.get().isPresent()); + return writeTxContainer.get(); + } } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceReadOnlyTx.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceReadOnlyTx.java index 9ef44f6584..33789fb786 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceReadOnlyTx.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceReadOnlyTx.java @@ -16,15 +16,18 @@ import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessag import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException; import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; import org.opendaylight.controller.sal.core.api.RpcImplementation; +import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -49,11 +52,12 @@ public final class NetconfDeviceReadOnlyTx implements DOMDataReadOnlyTransaction this.id = id; } - public ListenableFuture>> readConfigurationData(final YangInstanceIdentifier path) { + private CheckedFuture>, ReadFailedException> readConfigurationData( + final YangInstanceIdentifier path) { final ListenableFuture> future = rpc.invokeRpc(NETCONF_GET_CONFIG_QNAME, NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))); - return Futures.transform(future, new Function, Optional>>() { + ListenableFuture>> transformedFuture = Futures.transform(future, new Function, Optional>>() { @Override public Optional> apply(final RpcResult result) { checkReadSuccess(result, path); @@ -66,6 +70,8 @@ public final class NetconfDeviceReadOnlyTx implements DOMDataReadOnlyTransaction transform(path, node); } }); + + return MappingCheckedFuture.create(transformedFuture, ReadFailedException.MAPPER); } private void checkReadSuccess(final RpcResult result, final YangInstanceIdentifier path) { @@ -85,10 +91,11 @@ public final class NetconfDeviceReadOnlyTx implements DOMDataReadOnlyTransaction } } - public ListenableFuture>> readOperationalData(final YangInstanceIdentifier path) { + private CheckedFuture>, ReadFailedException> readOperationalData( + final YangInstanceIdentifier path) { final ListenableFuture> future = rpc.invokeRpc(NETCONF_GET_QNAME, NetconfMessageTransformUtil.wrap(NETCONF_GET_QNAME, toFilterStructure(path))); - return Futures.transform(future, new Function, Optional>>() { + ListenableFuture>> transformedFuture = Futures.transform(future, new Function, Optional>>() { @Override public Optional> apply(final RpcResult result) { checkReadSuccess(result, path); @@ -101,6 +108,8 @@ public final class NetconfDeviceReadOnlyTx implements DOMDataReadOnlyTransaction transform(path, node); } }); + + return MappingCheckedFuture.create(transformedFuture, ReadFailedException.MAPPER); } private static Node findNode(final CompositeNode node, final YangInstanceIdentifier identifier) { @@ -136,7 +145,8 @@ public final class NetconfDeviceReadOnlyTx implements DOMDataReadOnlyTransaction } @Override - public ListenableFuture>> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) { + public CheckedFuture>, ReadFailedException> read( + final LogicalDatastoreType store, final YangInstanceIdentifier path) { final YangInstanceIdentifier legacyPath = toLegacyPath(normalizer, path, id); switch (store) { diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceReadWriteTx.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceReadWriteTx.java index 4054cf9403..3d2c3b9d44 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceReadWriteTx.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceReadWriteTx.java @@ -11,8 +11,10 @@ package org.opendaylight.controller.sal.connect.netconf.sal.tx; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; + import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; @@ -62,7 +64,8 @@ public class NetconfDeviceReadWriteTx implements DOMDataReadWriteTransaction { } @Override - public ListenableFuture>> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) { + public CheckedFuture>, ReadFailedException> read( + final LogicalDatastoreType store, final YangInstanceIdentifier path) { return delegateReadTx.read(store, path); }