Bug 1392: Change ReadTransaction#read to return CheckedFuture 78/9178/12
authortpantelis <tpanteli@brocade.com>
Mon, 14 Jul 2014 16:35:57 +0000 (12:35 -0400)
committertpantelis <tpanteli@brocade.com>
Tue, 29 Jul 2014 17:10:50 +0000 (13:10 -0400)
Added a ReadFailedException that is used with the CheckedFuture.
Moved the RpcError list from TransactionCommitFailedException to a new base
OperationFailedException in yangtools and also derived ReadFailedException from
OperationFailedException.

Added a static MAPPER in ReadFailedException class using the new generalized base
ExceptionMapper class in yangtools. Also derived
TransactionCommitFailedExceptionMapper from ExceptionMapper.

Modified uses of Futures#makeChecked in the read Tx and write Tx submit
to use the new MappingCheckedFuture class in yangtools (see
https://git.opendaylight.org/gerrit/#/c/9240/ for details).

Change-Id: I5c4f717f0b8664b7d39c1e6f0366525f04e6634d
Signed-off-by: tpantelis <tpanteli@brocade.com>
29 files changed:
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/ReadTransaction.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedTransaction.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataReadTransactionImpl.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataReadWriteTransactionImpl.java
opendaylight/md-sal/sal-common-api/pom.xml
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/ReadFailedException.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/TransactionCommitFailedException.java
opendaylight/md-sal/sal-common-util/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataReadTransaction.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadOnlyTransaction.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadWriteTransaction.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/TransactionCommitFailedExceptionMapper.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BackwardsCompatibleMountPoint.java
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/DOMStoreReadTransaction.java
opendaylight/md-sal/sal-inmemory-datastore/pom.xml
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadTransaction.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadWriteTransaction.java
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceReadOnlyTx.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceReadWriteTx.java

index cc85d43..b0c9373 100644 (file)
@@ -9,11 +9,12 @@ package org.opendaylight.controller.md.sal.binding.api;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncReadTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
 
 /**
  * A transaction that provides read access to a logical data store.
@@ -33,14 +34,17 @@ public interface ReadTransaction extends AsyncReadTransaction<InstanceIdentifier
      * @param path
      *            Path which uniquely identifies subtree which client want to
      *            read
-     * @return Listenable Future which contains read result
+     * @return a CheckFuture containing the result of the read. The Future blocks until the
+     *         commit operation is complete. Once complete:
      *         <ul>
-     *         <li>If data at supplied path exists the
-     *         {@link ListeblaFuture#get()} returns Optional object containing
-     *         data once read is done.
-     *         <li>If data at supplied path does not exists the
-     *         {@link ListenbleFuture#get()} returns {@link Optional#absent()}.
+     *         <li>If the data at the supplied path exists, the Future returns an Optional object
+     *         containing the data.</li>
+     *         <li>If the data at the supplied path does not exist, the Future returns
+     *         Optional#absent().</li>
+     *         <li>If the read of the data fails, the Future will fail with a
+     *         {@link ReadFailedException} or an exception derived from ReadFailedException.</li>
      *         </ul>
      */
-    <T extends DataObject> ListenableFuture<Optional<T>> read(LogicalDatastoreType store, InstanceIdentifier<T> path);
+    <T extends DataObject> CheckedFuture<Optional<T>,ReadFailedException> read(
+            LogicalDatastoreType store, InstanceIdentifier<T> path);
 }
index e52fcdc..96a3f1c 100644 (file)
@@ -9,17 +9,19 @@ package org.opendaylight.controller.md.sal.binding.impl;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
 import org.opendaylight.yangtools.concepts.Delegator;
 import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 
 
 abstract class AbstractForwardedTransaction<T extends AsyncTransaction<YangInstanceIdentifier, NormalizedNode<?, ?>>>
@@ -54,8 +56,13 @@ abstract class AbstractForwardedTransaction<T extends AsyncTransaction<YangInsta
         return codec;
     }
 
-    protected final <T extends DataObject> ListenableFuture<Optional<T>> doRead(final DOMDataReadTransaction readTx,
-            final LogicalDatastoreType store, final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<T> path) {
-        return Futures.transform(readTx.read(store, codec.toNormalized(path)), codec.deserializeFunction(path));
+    protected final <T extends DataObject> CheckedFuture<Optional<T>,ReadFailedException> doRead(
+            final DOMDataReadTransaction readTx, final LogicalDatastoreType store,
+            final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<T> path) {
+
+        return MappingCheckedFuture.create(
+                    Futures.transform(readTx.read(store, codec.toNormalized(path)),
+                                      codec.deserializeFunction(path)),
+                    ReadFailedException.MAPPER);
     }
 }
index bb94204..fd0945f 100644 (file)
@@ -9,12 +9,13 @@ package org.opendaylight.controller.md.sal.binding.impl;
 
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
 
 class BindingDataReadTransactionImpl extends AbstractForwardedTransaction<DOMDataReadOnlyTransaction> implements
         ReadOnlyTransaction {
@@ -25,8 +26,8 @@ class BindingDataReadTransactionImpl extends AbstractForwardedTransaction<DOMDat
     }
 
     @Override
-    public <T extends DataObject> ListenableFuture<Optional<T>> read(final LogicalDatastoreType store,
-            final InstanceIdentifier<T> path) {
+    public <T extends DataObject> CheckedFuture<Optional<T>,ReadFailedException> read(
+            final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
         return doRead(getDelegate(),store, path);
     }
 
index c8b9d93..a1da029 100644 (file)
@@ -9,12 +9,13 @@ package org.opendaylight.controller.md.sal.binding.impl;
 
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
 
 class BindingDataReadWriteTransactionImpl extends
         BindingDataWriteTransactionImpl<DOMDataReadWriteTransaction> implements ReadWriteTransaction {
@@ -25,8 +26,8 @@ class BindingDataReadWriteTransactionImpl extends
     }
 
     @Override
-    public <T extends DataObject> ListenableFuture<Optional<T>> read(final LogicalDatastoreType store,
-            final InstanceIdentifier<T> path) {
+    public <T extends DataObject> CheckedFuture<Optional<T>,ReadFailedException> read(
+            final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
         return doRead(getDelegate(), store, path);
     }
 }
\ No newline at end of file
index 3449c54..e46fe1f 100644 (file)
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>concepts</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>util</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-common</artifactId>
diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/ReadFailedException.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/ReadFailedException.java
new file mode 100644 (file)
index 0000000..b0a7807
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.common.api.data;
+
+import org.opendaylight.yangtools.util.concurrent.ExceptionMapper;
+import org.opendaylight.yangtools.yang.common.OperationFailedException;
+import org.opendaylight.yangtools.yang.common.RpcError;
+
+/**
+ * An exception for a failed read.
+ */
+public class ReadFailedException extends OperationFailedException {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final ExceptionMapper<ReadFailedException> MAPPER =
+            new ExceptionMapper<ReadFailedException>("read", ReadFailedException.class) {
+                @Override
+                protected ReadFailedException newWithCause(String message, Throwable cause) {
+                    return new ReadFailedException(message, cause);
+                }
+    };
+
+    public ReadFailedException(String message, RpcError... errors) {
+        super(message, errors);
+    }
+
+    public ReadFailedException(String message, Throwable cause, RpcError... errors) {
+        super(message, cause, errors);
+    }
+}
index 18a857e..7ac76e4 100644 (file)
@@ -7,14 +7,8 @@
  */
 package org.opendaylight.controller.md.sal.common.api.data;
 
-import java.util.Arrays;
-import java.util.List;
-
+import org.opendaylight.yangtools.yang.common.OperationFailedException;
 import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
-
-import com.google.common.collect.ImmutableList;
 
 /**
  *
@@ -24,41 +18,16 @@ import com.google.common.collect.ImmutableList;
  * failed.
  *
  */
-public class TransactionCommitFailedException extends Exception {
+public class TransactionCommitFailedException extends OperationFailedException {
 
     private static final long serialVersionUID = 1L;
 
-    private final List<RpcError> errorList;
-
     public TransactionCommitFailedException(final String message, final RpcError... errors) {
         this(message, null, errors);
     }
 
     public TransactionCommitFailedException(final String message, final Throwable cause,
                                             final RpcError... errors) {
-        super(message, cause);
-
-        if( errors != null && errors.length > 0 ) {
-            errorList = ImmutableList.<RpcError>builder().addAll( Arrays.asList( errors ) ).build();
-        }
-        else {
-            // Add a default RpcError.
-            errorList = ImmutableList.of(RpcResultBuilder.newError(ErrorType.APPLICATION, null,
-                    getMessage(), null, null, getCause()));
-        }
-    }
-
-    /**
-     * Returns additional error information about this exception.
-     *
-     * @return a List of RpcErrors. There is always at least one RpcError.
-     */
-    public List<RpcError> getErrorList() {
-        return errorList;
-    }
-
-    @Override
-    public String getMessage() {
-        return new StringBuilder( super.getMessage() ).append(", errors: ").append( errorList ).toString();
+        super(message, cause, errors);
     }
 }
index 9108f86..e42c86a 100644 (file)
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-common-api</artifactId>
index 2ef8e5f..d21ea51 100644 (file)
@@ -8,8 +8,11 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import java.util.concurrent.Executors;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
@@ -30,8 +33,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 /**
  *
@@ -41,6 +44,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
     private static final Logger
         LOG = LoggerFactory.getLogger(DistributedDataStore.class);
 
+    private static final int DEFAULT_EXECUTOR_POOL_SIZE = 10;
 
     private final String type;
     private final ActorContext actorContext;
@@ -55,10 +59,10 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
      * This is typically used when we need to make a request to an actor and
      * wait for it's response and the consumer needs to be provided a Future.
      *
-     * FIXME : Make the thread pool configurable
+     * FIXME : Make the thread pool size configurable.
      */
-    private final ExecutorService executor =
-        Executors.newFixedThreadPool(10);
+    private final ListeningExecutorService executor =
+        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_EXECUTOR_POOL_SIZE));
 
     public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, Configuration configuration) {
         this(new ActorContext(actorSystem, actorSystem
index 999d0f8..a7089a7 100644 (file)
@@ -15,9 +15,11 @@ import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
 import akka.serialization.Serialization;
+
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
@@ -74,7 +76,7 @@ public class Shard extends RaftActor {
 
     private final String name;
 
-    private SchemaContext schemaContext;
+    private volatile SchemaContext schemaContext;
 
     private final ShardStats shardMBean;
 
index b56dc94..5622065 100644 (file)
@@ -10,8 +10,10 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorPath;
 import akka.actor.ActorSelection;
+
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
@@ -29,7 +31,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
 
 /**
  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
@@ -42,14 +43,14 @@ public class ThreePhaseCommitCohortProxy implements
 
     private final ActorContext actorContext;
     private final List<ActorPath> cohortPaths;
-    private final ExecutorService executor;
+    private final ListeningExecutorService executor;
     private final String transactionId;
 
 
     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
         List<ActorPath> cohortPaths,
         String transactionId,
-        ExecutorService executor) {
+        ListeningExecutorService executor) {
 
         this.actorContext = actorContext;
         this.cohortPaths = cohortPaths;
@@ -58,42 +59,38 @@ public class ThreePhaseCommitCohortProxy implements
     }
 
     @Override public ListenableFuture<Boolean> canCommit() {
-        Callable<Boolean> call = new Callable() {
-
-            @Override public Boolean call() throws Exception {
-            for(ActorPath actorPath : cohortPaths){
-                ActorSelection cohort = actorContext.actorSelection(actorPath);
-
-                try {
-                    Object response =
-                        actorContext.executeRemoteOperation(cohort,
-                            new CanCommitTransaction().toSerializable(),
-                            ActorContext.ASK_DURATION);
-
-                    if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
-                        CanCommitTransactionReply reply =
-                            CanCommitTransactionReply.fromSerializable(response);
-                        if (!reply.getCanCommit()) {
-                            return false;
+        Callable<Boolean> call = new Callable<Boolean>() {
+
+            @Override
+            public Boolean call() throws Exception {
+                for(ActorPath actorPath : cohortPaths){
+                    ActorSelection cohort = actorContext.actorSelection(actorPath);
+
+                    try {
+                        Object response =
+                                actorContext.executeRemoteOperation(cohort,
+                                        new CanCommitTransaction().toSerializable(),
+                                        ActorContext.ASK_DURATION);
+
+                        if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
+                            CanCommitTransactionReply reply =
+                                    CanCommitTransactionReply.fromSerializable(response);
+                            if (!reply.getCanCommit()) {
+                                System.out.println("**TOM - failed: false");
+                                return false;
+                            }
                         }
+                    } catch(RuntimeException e){
+                        LOG.error("Unexpected Exception", e);
+                        return false;
                     }
-                } catch(RuntimeException e){
-                    LOG.error("Unexpected Exception", e);
-                    return false;
                 }
 
-
-            }
-            return true;
+                return true;
             }
         };
 
-        ListenableFutureTask<Boolean>
-            future = ListenableFutureTask.create(call);
-
-        executor.submit(future);
-
-        return future;
+        return executor.submit(call);
     }
 
     @Override public ListenableFuture<Void> preCommit() {
@@ -138,13 +135,7 @@ public class ThreePhaseCommitCohortProxy implements
             }
         };
 
-        ListenableFutureTask<Void>
-            future = ListenableFutureTask.create(call);
-
-        executor.submit(future);
-
-        return future;
-
+        return executor.submit(call);
     }
 
     public List<ActorPath> getCohortPaths() {
index 2e8538d..5e9defa 100644 (file)
@@ -15,17 +15,18 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-import java.util.concurrent.ExecutorService;
+import com.google.common.util.concurrent.ListeningExecutorService;
 
 /**
  * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
  */
 public class TransactionChainProxy implements DOMStoreTransactionChain{
     private final ActorContext actorContext;
-    private final ExecutorService transactionExecutor;
+    private final ListeningExecutorService transactionExecutor;
     private final SchemaContext schemaContext;
 
-    public TransactionChainProxy(ActorContext actorContext, ExecutorService transactionExecutor, SchemaContext schemaContext) {
+    public TransactionChainProxy(ActorContext actorContext, ListeningExecutorService transactionExecutor,
+            SchemaContext schemaContext) {
         this.actorContext = actorContext;
         this.transactionExecutor = transactionExecutor;
         this.schemaContext = schemaContext;
index c85d320..cbd61b2 100644 (file)
@@ -12,10 +12,12 @@ import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
+
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -29,8 +31,10 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -42,7 +46,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -74,13 +77,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private final ActorContext actorContext;
     private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
     private final String identifier;
-    private final ExecutorService executor;
+    private final ListeningExecutorService executor;
     private final SchemaContext schemaContext;
 
     public TransactionProxy(
         ActorContext actorContext,
         TransactionType transactionType,
-        ExecutorService executor,
+        ListeningExecutorService executor,
         SchemaContext schemaContext
     ) {
 
@@ -94,7 +97,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     }
 
     @Override
-    public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final YangInstanceIdentifier path) {
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
+            final YangInstanceIdentifier path) {
 
         createTransactionIfMissing(actorContext, path);
 
@@ -197,6 +201,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 remoteTransactionPaths.put(shardName, transactionContext);
             }
         } catch(TimeoutException e){
+            LOG.warn("Timed out trying to create transaction on shard {}: {}", shardName, e);
             remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName));
         }
     }
@@ -214,7 +219,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
 
-        ListenableFuture<Optional<NormalizedNode<?, ?>>> readData(final YangInstanceIdentifier path);
+        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
+                final YangInstanceIdentifier path);
 
         void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
     }
@@ -266,9 +272,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             getActor().tell(new MergeData(path, data, schemaContext).toSerializable(), null);
         }
 
-        @Override public ListenableFuture<Optional<NormalizedNode<?, ?>>> readData(final YangInstanceIdentifier path) {
+        @Override public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
+                final YangInstanceIdentifier path) {
 
-            Callable<Optional<NormalizedNode<?,?>>> call = new Callable() {
+            Callable<Optional<NormalizedNode<?,?>>> call = new Callable<Optional<NormalizedNode<?,?>>>() {
 
                 @Override public Optional<NormalizedNode<?,?>> call() throws Exception {
                     Object response = actorContext
@@ -279,20 +286,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                         if(reply.getNormalizedNode() == null){
                             return Optional.absent();
                         }
-                        //FIXME : A cast should not be required here ???
-                        return (Optional<NormalizedNode<?, ?>>) Optional.of(reply.getNormalizedNode());
+                        return Optional.<NormalizedNode<?,?>>of(reply.getNormalizedNode());
                     }
 
                     return Optional.absent();
                 }
             };
 
-            ListenableFutureTask<Optional<NormalizedNode<?, ?>>>
-                future = ListenableFutureTask.create(call);
-
-            executor.submit(future);
-
-            return future;
+            return MappingCheckedFuture.create(executor.submit(call), ReadFailedException.MAPPER);
         }
 
         @Override public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
@@ -342,10 +343,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
 
         @Override
-        public ListenableFuture<Optional<NormalizedNode<?, ?>>> readData(
+        public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
             YangInstanceIdentifier path) {
             LOG.error("readData called path = {}", path);
-            return Futures.immediateFuture(
+            return Futures.immediateCheckedFuture(
                 Optional.<NormalizedNode<?, ?>>absent());
         }
 
index b5e3d24..5f4ac57 100644 (file)
@@ -2,8 +2,10 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -19,6 +21,8 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
@@ -52,7 +56,9 @@ public class DistributedDataStoreIntegrationTest{
 
         distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
 
-        Thread.sleep(1000);
+        // This sleep is fragile - test can fail intermittently if all Shards aren't updated with
+        // the SchemaContext in time. Is there any way we can make this deterministic?
+        Thread.sleep(2000);
 
         DOMStoreReadWriteTransaction transaction =
             distributedDataStore.newReadWriteTransaction();
@@ -72,22 +78,21 @@ public class DistributedDataStoreIntegrationTest{
 
         ListenableFuture<Boolean> canCommit = ready.canCommit();
 
-        assertTrue(canCommit.get());
+        assertTrue(canCommit.get(5, TimeUnit.SECONDS));
 
         ListenableFuture<Void> preCommit = ready.preCommit();
 
-        preCommit.get();
+        preCommit.get(5, TimeUnit.SECONDS);
 
         ListenableFuture<Void> commit = ready.commit();
 
-        commit.get();
-
+        commit.get(5, TimeUnit.SECONDS);
     }
 
 
     @Test
     public void integrationTestWithMultiShardConfiguration()
-        throws ExecutionException, InterruptedException {
+        throws ExecutionException, InterruptedException, TimeoutException {
         Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
 
         ShardStrategyFactory.setConfiguration(configuration);
@@ -97,7 +102,9 @@ public class DistributedDataStoreIntegrationTest{
 
         distributedDataStore.onGlobalContextUpdated(SchemaContextHelper.full());
 
-        Thread.sleep(1000);
+        // This sleep is fragile - test can fail intermittently if all Shards aren't updated with
+        // the SchemaContext in time. Is there any way we can make this deterministic?
+        Thread.sleep(2000);
 
         DOMStoreReadWriteTransaction transaction =
             distributedDataStore.newReadWriteTransaction();
@@ -109,16 +116,15 @@ public class DistributedDataStoreIntegrationTest{
 
         ListenableFuture<Boolean> canCommit = ready.canCommit();
 
-        assertTrue(canCommit.get());
+        assertTrue(canCommit.get(5, TimeUnit.SECONDS));
 
         ListenableFuture<Void> preCommit = ready.preCommit();
 
-        preCommit.get();
+        preCommit.get(5, TimeUnit.SECONDS);
 
         ListenableFuture<Void> commit = ready.commit();
 
-        commit.get();
-
+        commit.get(5, TimeUnit.SECONDS);
     }
 
 }
index 992518e..4eca567 100644 (file)
@@ -2,8 +2,14 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
+
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
 import junit.framework.Assert;
+
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
@@ -14,7 +20,6 @@ import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor
 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
 
 import java.util.Arrays;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import static org.junit.Assert.assertNotNull;
@@ -25,7 +30,8 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     private Props props;
     private ActorRef actorRef;
     private MockActorContext actorContext;
-    private ExecutorService executor = Executors.newSingleThreadExecutor();
+    private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
+                        Executors.newSingleThreadExecutor());
 
     @Before
     public void setUp(){
@@ -39,6 +45,11 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
     }
 
+    @After
+    public void tearDown() {
+        executor.shutdownNow();
+    }
+
     @Test
     public void testCanCommit() throws Exception {
         actorContext.setExecuteRemoteOperationResponse(new CanCommitTransactionReply(true).toSerializable());
index f654e3a..7d9d2da 100644 (file)
@@ -2,9 +2,15 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
 import junit.framework.Assert;
+
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
@@ -28,7 +34,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 public class TransactionProxyTest extends AbstractActorTest {
@@ -38,14 +43,19 @@ public class TransactionProxyTest extends AbstractActorTest {
     private final ActorContext testContext =
         new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration );
 
-    private ExecutorService transactionExecutor =
-        Executors.newSingleThreadExecutor();
+    private final ListeningExecutorService transactionExecutor =
+        MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
 
     @Before
     public void setUp(){
         ShardStrategyFactory.setConfiguration(configuration);
     }
 
+    @After
+    public void tearDown() {
+        transactionExecutor.shutdownNow();
+    }
+
     @Test
     public void testRead() throws Exception {
         final Props props = Props.create(DoNothingActor.class);
index afa2286..fc251c8 100644 (file)
@@ -9,11 +9,12 @@ package org.opendaylight.controller.md.sal.dom.api;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncReadTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
 
 /**
  * A transaction that provides read access to a logical data store.
@@ -33,14 +34,17 @@ public interface DOMDataReadTransaction extends AsyncReadTransaction<YangInstanc
      * @param path
      *            Path which uniquely identifies subtree which client want to
      *            read
-     * @return Listenable Future which contains read result
+     * @return a CheckFuture containing the result of the read. The Future blocks until the
+     *         commit operation is complete. Once complete:
      *         <ul>
-     *         <li>If data at supplied path exists the
-     *         {@link ListeblaFuture#get()} returns Optional object containing
-     *         data once read is done.
-     *         <li>If data at supplied path does not exists the
-     *         {@link ListenbleFuture#get()} returns {@link Optional#absent()}.
+     *         <li>If the data at the supplied path exists, the Future returns an Optional object
+     *         containing the data.</li>
+     *         <li>If the data at the supplied path does not exist, the Future returns
+     *         Optional#absent().</li>
+     *         <li>If the read of the data fails, the Future will fail with a
+     *         {@link ReadFailedException} or an exception derived from ReadFailedException.</li>
      *         </ul>
      */
-    ListenableFuture<Optional<NormalizedNode<?,?>>> read(LogicalDatastoreType store,YangInstanceIdentifier path);
+    CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(
+            LogicalDatastoreType store, YangInstanceIdentifier path);
 }
index 8b9eb44..9a6d12f 100644 (file)
@@ -15,6 +15,7 @@ import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,7 +92,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
             Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
         }
 
-        return Futures.makeChecked(commitFuture, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+        return MappingCheckedFuture.create(commitFuture,
+                TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
     }
 
     /**
@@ -285,7 +287,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
              */
             @SuppressWarnings({ "unchecked", "rawtypes" })
             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
-            return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
+            return MappingCheckedFuture.create(compositeResult,
+                                         TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
         }
 
         /**
@@ -316,7 +319,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
              */
             @SuppressWarnings({ "unchecked", "rawtypes" })
             ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
-            return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+            return MappingCheckedFuture.create(compositeResult,
+                                     TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
         }
 
         /**
@@ -342,8 +346,8 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
             }
             ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
             ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
-            return Futures
-                    .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
+            return MappingCheckedFuture.create(allSuccessFuture,
+                                       TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
 
         }
 
index c8edcbc..b4562cf 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -15,7 +16,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
 
 /**
  *
@@ -34,8 +35,8 @@ class DOMForwardedReadOnlyTransaction extends
     }
 
     @Override
-    public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
-            final YangInstanceIdentifier path) {
+    public CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(
+            final LogicalDatastoreType store, final YangInstanceIdentifier path) {
         return getSubtransaction(store).read(path);
     }
 
index e6521b2..74a4c52 100644 (file)
@@ -7,6 +7,7 @@
  */package org.opendaylight.controller.md.sal.dom.broker.impl;
 
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -14,7 +15,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
 
 /**
  *
@@ -45,8 +46,8 @@ class DOMForwardedReadWriteTransaction extends DOMForwardedWriteTransaction<DOMS
     }
 
     @Override
-    public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
-            final YangInstanceIdentifier path) {
+    public CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(
+            final LogicalDatastoreType store, final YangInstanceIdentifier path) {
         return getSubtransaction(store).read(path);
     }
 }
\ No newline at end of file
index 258b068..799a8a0 100644 (file)
@@ -7,29 +7,16 @@
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
-import java.util.concurrent.ExecutionException;
-
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.util.concurrent.ExceptionMapper;
 
 /**
+ * Utility exception mapper which translates Exception to {@link TransactionCommitFailedException}.
  *
- * Utility exception mapper which translates {@link Exception}
- * to {@link TransactionCommitFailedException}.
- *
- * This mapper is intended to be used with {@link com.google.common.util.concurrent.Futures#makeChecked(com.google.common.util.concurrent.ListenableFuture, Function)}
- * <ul>
- * <li>if exception is {@link TransactionCommitFailedException} or one of its subclasses returns original exception.
- * <li>if exception is {@link ExecutionException} and cause is  {@link TransactionCommitFailedException} return cause
- * <li>otherwise returns {@link TransactionCommitFailedException} with original exception as a cause.
- * </ul>
- *
+ * @see ExceptionMapper
  */
-
-final class TransactionCommitFailedExceptionMapper implements
-        Function<Exception, TransactionCommitFailedException> {
+final class TransactionCommitFailedExceptionMapper
+                           extends ExceptionMapper<TransactionCommitFailedException> {
 
     static final TransactionCommitFailedExceptionMapper PRE_COMMIT_MAPPER = create("canCommit");
 
@@ -37,10 +24,8 @@ final class TransactionCommitFailedExceptionMapper implements
 
     static final TransactionCommitFailedExceptionMapper COMMIT_ERROR_MAPPER = create("commit");
 
-    private final String opName;
-
     private TransactionCommitFailedExceptionMapper(final String opName) {
-        this.opName = Preconditions.checkNotNull(opName);
+        super( opName, TransactionCommitFailedException.class );
     }
 
     public static final TransactionCommitFailedExceptionMapper create(final String opName) {
@@ -48,22 +33,7 @@ final class TransactionCommitFailedExceptionMapper implements
     }
 
     @Override
-    public TransactionCommitFailedException apply(final Exception e) {
-        // If excetion is TransactionCommitFailedException
-        // we reuse it directly.
-        if (e instanceof TransactionCommitFailedException) {
-            return (TransactionCommitFailedException) e;
-        }
-        // If error is ExecutionException which was caused by cause of
-        // TransactionCommitFailedException
-        // we reuse original cause
-        if (e instanceof ExecutionException && e.getCause() instanceof TransactionCommitFailedException) {
-            return (TransactionCommitFailedException) e.getCause();
-        }
-        if (e instanceof InterruptedException) {
-            return new TransactionCommitFailedException(opName + " failed - DOMStore was interupted.", e);
-        }
-        // Otherwise we are using new exception, with original cause
-        return new TransactionCommitFailedException(opName + " failed", e);
+    protected TransactionCommitFailedException newWithCause( String message, Throwable cause ) {
+        return new TransactionCommitFailedException( message, cause );
     }
 }
\ No newline at end of file
index 61ea47e..5bd8a7b 100644 (file)
@@ -15,16 +15,20 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import javax.annotation.Nullable;
+
 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
@@ -380,7 +384,8 @@ public class BackwardsCompatibleMountPoint implements MountProvisionInstance, Sc
             }
 
             @Override
-            public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+            public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
+                    final LogicalDatastoreType store, final YangInstanceIdentifier path) {
 
                 CompositeNode rawData = null;
 
@@ -398,7 +403,7 @@ public class BackwardsCompatibleMountPoint implements MountProvisionInstance, Sc
 
                 final Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalized = normalizer.toNormalized(path, rawData);
                 final Optional<NormalizedNode<?, ?>> normalizedNodeOptional = Optional.<NormalizedNode<?, ?>>fromNullable(normalized.getValue());
-                return com.google.common.util.concurrent.Futures.immediateFuture(normalizedNodeOptional);
+                return Futures.immediateCheckedFuture(normalizedNodeOptional);
             }
         }
 
@@ -508,7 +513,8 @@ public class BackwardsCompatibleMountPoint implements MountProvisionInstance, Sc
             }
 
             @Override
-            public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+            public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
+                    final LogicalDatastoreType store, final YangInstanceIdentifier path) {
                 return new BackwardsCompatibleReadTransaction(dataReader, dataNormalizer).read(store, path);
             }
 
index ae1b3ee..84d09c7 100644 (file)
@@ -7,29 +7,31 @@
  */
 package org.opendaylight.controller.sal.core.spi.data;
 
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
 
 public interface DOMStoreReadTransaction extends DOMStoreTransaction {
 
     /**
-     *
      * Reads data from provided logical data store located at provided path
      *
-     *
      * @param path
      *            Path which uniquely identifies subtree which client want to
      *            read
-     * @return Listenable Future which contains read result
+     * @return a CheckFuture containing the result of the read. The Future blocks until the
+     *         commit operation is complete. Once complete:
      *         <ul>
-     *         <li>If data at supplied path exists the {@link java.util.concurrent.Future#get()}
-     *         returns Optional object containing data
-     *         <li>If data at supplied path does not exists the
-     *         {@link java.util.concurrent.Future#get()} returns {@link Optional#absent()}.
+     *         <li>If the data at the supplied path exists, the Future returns an Optional object
+     *         containing the data.</li>
+     *         <li>If the data at the supplied path does not exist, the Future returns
+     *         Optional#absent().</li>
+     *         <li>If the read of the data fails, the Future will fail with a
+     *         {@link ReadFailedException} or an exception derived from ReadFailedException.</li>
      *         </ul>
      */
-    ListenableFuture<Optional<NormalizedNode<?,?>>> read(YangInstanceIdentifier path);
+    CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(YangInstanceIdentifier path);
 }
index 9068474..725b24c 100644 (file)
       <artifactId>binding-generator-impl</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>mockito-configuration</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-test-model</artifactId>
       <scope>test</scope>
-      <version>${project.version}</version>
-    </dependency>
+      </dependency>
   </dependencies>
 
   <build>
index 39d6483..2a98406 100644 (file)
@@ -8,9 +8,8 @@
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -19,8 +18,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  *
@@ -30,8 +29,9 @@ import com.google.common.util.concurrent.ListenableFuture;
  * which delegates most of its calls to similar methods provided by underlying snapshot.
  *
  */
-final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction implements
-DOMStoreReadTransaction {
+final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction
+                                          implements DOMStoreReadTransaction {
+
     private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadTransaction.class);
     private DataTreeSnapshot stableSnapshot;
 
@@ -48,9 +48,19 @@ DOMStoreReadTransaction {
     }
 
     @Override
-    public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final YangInstanceIdentifier path) {
+    public CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
+        LOG.debug("Tx: {} Read: {}", getIdentifier(), path);
         checkNotNull(path, "Path must not be null.");
-        checkState(stableSnapshot != null, "Transaction is closed");
-        return Futures.immediateFuture(stableSnapshot.readNode(path));
+
+        if(stableSnapshot == null) {
+            return Futures.immediateFailedCheckedFuture(new ReadFailedException("Transaction is closed"));
+        }
+
+        try {
+            return Futures.immediateCheckedFuture(stableSnapshot.readNode(path));
+        } catch (Exception e) {
+            LOG.error("Tx: {} Failed Read of {}", getIdentifier(), path, e);
+            return Futures.immediateFailedCheckedFuture(new ReadFailedException("Read failed",e));
+        }
     }
 }
\ No newline at end of file
index ec17d7a..5c5e9c6 100644 (file)
@@ -7,7 +7,11 @@
  */
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -15,16 +19,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * Implementation of Read-Write transaction which is backed by {@link DataTreeSnapshot}
  * and executed according to {@link TransactionReadyPrototype}.
  *
  */
-class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction implements
-DOMStoreReadWriteTransaction {
+class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction
+                                         implements DOMStoreReadWriteTransaction {
 
     private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadWriteTransaction.class);
 
@@ -41,13 +45,20 @@ DOMStoreReadWriteTransaction {
     }
 
     @Override
-    public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final YangInstanceIdentifier path) {
+    public CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
         LOG.debug("Tx: {} Read: {}", getIdentifier(), path);
+        checkNotNull(path, "Path must not be null.");
+
+        DataTreeModification dataView = getMutatedView();
+        if(dataView == null) {
+            return Futures.immediateFailedCheckedFuture(new ReadFailedException("Transaction is closed"));
+        }
+
         try {
-            return Futures.immediateFuture(getMutatedView().readNode(path));
+            return Futures.immediateCheckedFuture(dataView.readNode(path));
         } catch (Exception e) {
             LOG.error("Tx: {} Failed Read of {}", getIdentifier(), path, e);
-            throw e;
+            return Futures.immediateFailedCheckedFuture(new ReadFailedException("Read failed",e));
         }
     }
 }
\ No newline at end of file
index 96369de..9b105aa 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -16,12 +17,22 @@ import java.util.concurrent.ExecutionException;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 import com.google.common.base.Optional;
@@ -31,226 +42,373 @@ import com.google.common.util.concurrent.MoreExecutors;
 
 public class InMemoryDataStoreTest {
 
-  private SchemaContext schemaContext;
-  private InMemoryDOMDataStore domStore;
+    private SchemaContext schemaContext;
+    private InMemoryDOMDataStore domStore;
 
-  @Before
-  public void setupStore() {
-    domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
-    schemaContext = TestModel.createTestContext();
-    domStore.onGlobalContextUpdated(schemaContext);
+    @Before
+    public void setupStore() {
+        domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
+        schemaContext = TestModel.createTestContext();
+        domStore.onGlobalContextUpdated(schemaContext);
+    }
 
-  }
+    @Test
+    public void testTransactionIsolation() throws InterruptedException, ExecutionException {
 
-  @Test
-  public void testTransactionIsolation() throws InterruptedException, ExecutionException {
+        assertNotNull(domStore);
 
-    assertNotNull(domStore);
+        DOMStoreReadTransaction readTx = domStore.newReadOnlyTransaction();
+        assertNotNull(readTx);
 
-    DOMStoreReadTransaction readTx = domStore.newReadOnlyTransaction();
-    assertNotNull(readTx);
+        DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
+        assertNotNull(writeTx);
 
-    DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
-    assertNotNull(writeTx);
-    /**
-     *
-     * Writes /test in writeTx
-     *
-     */
-    writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        /**
+         * Writes /test in writeTx
+         */
+        NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        writeTx.write(TestModel.TEST_PATH, testNode);
 
-    /**
-     *
-     * Reads /test from writeTx Read should return container.
-     *
-     */
-    ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(TestModel.TEST_PATH);
-    assertTrue(writeTxContainer.get().isPresent());
+        /**
+         * Reads /test from writeTx Read should return container.
+         */
+        ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(TestModel.TEST_PATH);
+        assertEquals("read: isPresent", true, writeTxContainer.get().isPresent());
+        assertEquals("read: data", testNode, writeTxContainer.get().get());
 
-    /**
-     *
-     * Reads /test from readTx Read should return Absent.
-     *
-     */
-    ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx.read(TestModel.TEST_PATH);
-    assertFalse(readTxContainer.get().isPresent());
-  }
+        /**
+         * Reads /test from readTx Read should return Absent.
+         */
+        ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx.read(TestModel.TEST_PATH);
+        assertEquals("read: isPresent", false, readTxContainer.get().isPresent());
+    }
 
-  @Test
-  public void testTransactionCommit() throws InterruptedException, ExecutionException {
+    @Test
+    public void testTransactionCommit() throws InterruptedException, ExecutionException {
 
-    DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
-    assertNotNull(writeTx);
-    /**
-     *
-     * Writes /test in writeTx
-     *
-     */
-    writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
+        assertNotNull(writeTx);
 
-    /**
-     *
-     * Reads /test from writeTx Read should return container.
-     *
-     */
-    ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(TestModel.TEST_PATH);
-    assertTrue(writeTxContainer.get().isPresent());
+        /**
+         * Writes /test in writeTx
+         */
+        NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        writeTx.write(TestModel.TEST_PATH, testNode);
 
-    DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
+        /**
+         * Reads /test from writeTx Read should return container.
+         */
+        ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(TestModel.TEST_PATH);
+        assertEquals("read: isPresent", true, writeTxContainer.get().isPresent());
+        assertEquals("read: data", testNode, writeTxContainer.get().get());
 
-    assertThreePhaseCommit(cohort);
+        DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
 
-    Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH)
-        .get();
-    assertTrue(afterCommitRead.isPresent());
-  }
+        assertThreePhaseCommit(cohort);
 
-  @Test
-  public void testTransactionAbort() throws InterruptedException, ExecutionException {
+        Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH)
+                .get();
+        assertEquals("After commit read: isPresent", true, afterCommitRead.isPresent());
+        assertEquals("After commit read: data", testNode, afterCommitRead.get());
+    }
 
-    DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
-    assertNotNull(writeTx);
+    @Test
+    public void testDelete() throws Exception {
 
-    assertTestContainerWrite(writeTx);
+        DOMStoreWriteTransaction writeTx = domStore.newWriteOnlyTransaction();
+        assertNotNull( writeTx );
 
-    DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
+        // Write /test and commit
 
-    assertTrue(cohort.canCommit().get().booleanValue());
-    cohort.preCommit().get();
-    cohort.abort().get();
+        writeTx.write( TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
 
-    Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH)
-        .get();
-    assertFalse(afterCommitRead.isPresent());
-  }
+        assertThreePhaseCommit( writeTx.ready() );
 
-  @Test
-  public void testTransactionChain() throws InterruptedException, ExecutionException {
-    DOMStoreTransactionChain txChain = domStore.createTransactionChain();
-    assertNotNull(txChain);
+        Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().
+                read(TestModel.TEST_PATH ).get();
+        assertEquals( "After commit read: isPresent", true, afterCommitRead.isPresent() );
 
-    /**
-     * We alocate new read-write transaction and write /test
-     *
-     *
-     */
-    DOMStoreReadWriteTransaction firstTx = txChain.newReadWriteTransaction();
-    assertTestContainerWrite(firstTx);
+        // Delete /test and verify
 
-    /**
-     * First transaction is marked as ready, we are able to allocate chained
-     * transactions
-     */
-    DOMStoreThreePhaseCommitCohort firstWriteTxCohort = firstTx.ready();
+        writeTx = domStore.newWriteOnlyTransaction();
 
-    /**
-     * We alocate chained transaction - read transaction, note first one is
-     * still not commited to datastore.
-     */
-    DOMStoreReadTransaction secondReadTx = txChain.newReadOnlyTransaction();
+        writeTx.delete( TestModel.TEST_PATH );
 
-    /**
-     *
-     * We test if we are able to read data from tx, read should not fail
-     * since we are using chained transaction.
-     *
-     *
-     */
-    assertTestContainerExists(secondReadTx);
+        assertThreePhaseCommit( writeTx.ready() );
 
-    /**
-     *
-     * We alocate next transaction, which is still based on first one, but
-     * is read-write.
-     *
-     */
-    DOMStoreReadWriteTransaction thirdDeleteTx = txChain.newReadWriteTransaction();
+        afterCommitRead = domStore.newReadOnlyTransaction().
+                read(TestModel.TEST_PATH ).get();
+        assertEquals( "After commit read: isPresent", false, afterCommitRead.isPresent() );
+    }
 
-    /**
-     * We test existence of /test in third transaction container should
-     * still be visible from first one (which is still uncommmited).
-     *
-     *
-     */
-    assertTestContainerExists(thirdDeleteTx);
+    @Test
+    public void testMerge() throws Exception {
 
-    /**
-     * We delete node in third transaction
-     */
-    thirdDeleteTx.delete(TestModel.TEST_PATH);
+        DOMStoreWriteTransaction writeTx = domStore.newWriteOnlyTransaction();
+        assertNotNull( writeTx );
 
-    /**
-     * third transaction is sealed.
-     */
-    DOMStoreThreePhaseCommitCohort thirdDeleteTxCohort = thirdDeleteTx.ready();
+        ContainerNode containerNode = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier( new NodeIdentifier( TestModel.TEST_QNAME ) )
+                .addChild( ImmutableNodes.mapNodeBuilder( TestModel.OUTER_LIST_QNAME )
+                        .addChild( ImmutableNodes.mapEntry( TestModel.OUTER_LIST_QNAME,
+                                                            TestModel.ID_QNAME, 1 ) ).build() ).build();
 
-    /**
-     * We commit first transaction
-     *
-     */
-    assertThreePhaseCommit(firstWriteTxCohort);
+        writeTx.merge( TestModel.TEST_PATH, containerNode );
 
-    // Alocates store transacion
-    DOMStoreReadTransaction storeReadTx = domStore.newReadOnlyTransaction();
-    /**
-     * We verify transaction is commited to store, container should exists
-     * in datastore.
-     */
-    assertTestContainerExists(storeReadTx);
-    /**
-     * We commit third transaction
-     *
-     */
-    assertThreePhaseCommit(thirdDeleteTxCohort);
-  }
+        assertThreePhaseCommit( writeTx.ready() );
 
-  @Test
-  @Ignore
-  public void testTransactionConflict() throws InterruptedException, ExecutionException {
-    DOMStoreReadWriteTransaction txOne = domStore.newReadWriteTransaction();
-    DOMStoreReadWriteTransaction txTwo = domStore.newReadWriteTransaction();
-    assertTestContainerWrite(txOne);
-    assertTestContainerWrite(txTwo);
+        Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().
+                read(TestModel.TEST_PATH ).get();
+        assertEquals( "After commit read: isPresent", true, afterCommitRead.isPresent() );
+        assertEquals( "After commit read: data", containerNode, afterCommitRead.get() );
 
-    /**
-     * Commits transaction
-     */
-    assertThreePhaseCommit(txOne.ready());
+        // Merge a new list entry node
 
-    /**
-     * Asserts that txTwo could not be commited
-     */
-    assertFalse(txTwo.ready().canCommit().get());
-  }
-
-  private static void assertThreePhaseCommit(final DOMStoreThreePhaseCommitCohort cohort)
-      throws InterruptedException, ExecutionException {
-    assertTrue(cohort.canCommit().get().booleanValue());
-    cohort.preCommit().get();
-    cohort.commit().get();
-  }
-
-  private static Optional<NormalizedNode<?, ?>> assertTestContainerWrite(final DOMStoreReadWriteTransaction writeTx)
-      throws InterruptedException, ExecutionException {
-    /**
-     *
-     * Writes /test in writeTx
-     *
-     */
-    writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        writeTx = domStore.newWriteOnlyTransaction();
+        assertNotNull( writeTx );
+
+        containerNode = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier( new NodeIdentifier( TestModel.TEST_QNAME ) )
+                .addChild( ImmutableNodes.mapNodeBuilder( TestModel.OUTER_LIST_QNAME )
+                        .addChild( ImmutableNodes.mapEntry( TestModel.OUTER_LIST_QNAME,
+                                                            TestModel.ID_QNAME, 1 ) )
+                        .addChild( ImmutableNodes.mapEntry( TestModel.OUTER_LIST_QNAME,
+                                                            TestModel.ID_QNAME, 2 ) ).build() ).build();
+
+        writeTx.merge( TestModel.TEST_PATH, containerNode );
+
+        assertThreePhaseCommit( writeTx.ready() );
+
+        afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH ).get();
+        assertEquals( "After commit read: isPresent", true, afterCommitRead.isPresent() );
+        assertEquals( "After commit read: data", containerNode, afterCommitRead.get() );
+    }
+
+    @Test(expected=ReadFailedException.class)
+    public void testReadWithReadOnlyTransactionClosed() throws Throwable {
+
+        DOMStoreReadTransaction readTx = domStore.newReadOnlyTransaction();
+        assertNotNull( readTx );
+
+        readTx.close();
+
+        doReadAndThrowEx( readTx );
+    }
+
+    @Test(expected=ReadFailedException.class)
+    public void testReadWithReadOnlyTransactionFailure() throws Throwable {
+
+        DataTreeSnapshot mockSnapshot = Mockito.mock( DataTreeSnapshot.class );
+        Mockito.doThrow( new RuntimeException( "mock ex" ) ).when( mockSnapshot )
+        .readNode( Mockito.any( YangInstanceIdentifier.class ) );
+
+        DOMStoreReadTransaction readTx = new SnapshotBackedReadTransaction( "1", mockSnapshot );
+
+        doReadAndThrowEx( readTx );
+    }
 
-    return assertTestContainerExists(writeTx);
-  }
+    @Test(expected=ReadFailedException.class)
+    public void testReadWithReadWriteTransactionClosed() throws Throwable {
 
-  /**
-   * Reads /test from readTx Read should return container.
-   */
-  private static Optional<NormalizedNode<?, ?>> assertTestContainerExists(final DOMStoreReadTransaction readTx)
-      throws InterruptedException, ExecutionException {
+        DOMStoreReadTransaction readTx = domStore.newReadWriteTransaction();
+        assertNotNull( readTx );
+
+        readTx.close();
+
+        doReadAndThrowEx( readTx );
+    }
+
+    @Test(expected=ReadFailedException.class)
+    public void testReadWithReadWriteTransactionFailure() throws Throwable {
+
+        DataTreeSnapshot mockSnapshot = Mockito.mock( DataTreeSnapshot.class );
+        DataTreeModification mockModification = Mockito.mock( DataTreeModification.class );
+        Mockito.doThrow( new RuntimeException( "mock ex" ) ).when( mockModification )
+        .readNode( Mockito.any( YangInstanceIdentifier.class ) );
+        Mockito.doReturn( mockModification ).when( mockSnapshot ).newModification();
+        TransactionReadyPrototype mockReady = Mockito.mock( TransactionReadyPrototype.class );
+        DOMStoreReadTransaction readTx = new SnapshotBackedReadWriteTransaction( "1", mockSnapshot, mockReady );
+
+        doReadAndThrowEx( readTx );
+    }
+
+    private void doReadAndThrowEx( DOMStoreReadTransaction readTx ) throws Throwable {
+
+        try {
+            readTx.read(TestModel.TEST_PATH).get();
+        } catch( ExecutionException e ) {
+            throw e.getCause();
+        }
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testWriteWithTransactionReady() throws Exception {
+
+        DOMStoreWriteTransaction writeTx = domStore.newWriteOnlyTransaction();
+
+        writeTx.ready();
+
+        // Should throw ex
+        writeTx.write( TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testReadyWithTransactionAlreadyReady() throws Exception {
+
+        DOMStoreWriteTransaction writeTx = domStore.newWriteOnlyTransaction();
+
+        writeTx.ready();
+
+        // Should throw ex
+        writeTx.ready();
+    }
+
+    @Test
+    public void testTransactionAbort() throws InterruptedException, ExecutionException {
+
+        DOMStoreReadWriteTransaction writeTx = domStore.newReadWriteTransaction();
+        assertNotNull(writeTx);
+
+        assertTestContainerWrite(writeTx);
+
+        DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
+
+        assertTrue(cohort.canCommit().get().booleanValue());
+        cohort.preCommit().get();
+        cohort.abort().get();
+
+        Optional<NormalizedNode<?, ?>> afterCommitRead = domStore.newReadOnlyTransaction().read(TestModel.TEST_PATH)
+                .get();
+        assertFalse(afterCommitRead.isPresent());
+    }
+
+    @Test
+    public void testTransactionChain() throws InterruptedException, ExecutionException {
+        DOMStoreTransactionChain txChain = domStore.createTransactionChain();
+        assertNotNull(txChain);
+
+        /**
+         * We alocate new read-write transaction and write /test
+         *
+         *
+         */
+        DOMStoreReadWriteTransaction firstTx = txChain.newReadWriteTransaction();
+        assertTestContainerWrite(firstTx);
+
+        /**
+         * First transaction is marked as ready, we are able to allocate chained
+         * transactions
+         */
+        DOMStoreThreePhaseCommitCohort firstWriteTxCohort = firstTx.ready();
+
+        /**
+         * We alocate chained transaction - read transaction, note first one is
+         * still not commited to datastore.
+         */
+        DOMStoreReadTransaction secondReadTx = txChain.newReadOnlyTransaction();
+
+        /**
+         *
+         * We test if we are able to read data from tx, read should not fail
+         * since we are using chained transaction.
+         *
+         *
+         */
+        assertTestContainerExists(secondReadTx);
+
+        /**
+         *
+         * We alocate next transaction, which is still based on first one, but
+         * is read-write.
+         *
+         */
+        DOMStoreReadWriteTransaction thirdDeleteTx = txChain.newReadWriteTransaction();
+
+        /**
+         * We test existence of /test in third transaction container should
+         * still be visible from first one (which is still uncommmited).
+         *
+         *
+         */
+        assertTestContainerExists(thirdDeleteTx);
+
+        /**
+         * We delete node in third transaction
+         */
+        thirdDeleteTx.delete(TestModel.TEST_PATH);
+
+        /**
+         * third transaction is sealed.
+         */
+        DOMStoreThreePhaseCommitCohort thirdDeleteTxCohort = thirdDeleteTx.ready();
+
+        /**
+         * We commit first transaction
+         *
+         */
+        assertThreePhaseCommit(firstWriteTxCohort);
+
+        // Alocates store transacion
+        DOMStoreReadTransaction storeReadTx = domStore.newReadOnlyTransaction();
+        /**
+         * We verify transaction is commited to store, container should exists
+         * in datastore.
+         */
+        assertTestContainerExists(storeReadTx);
+        /**
+         * We commit third transaction
+         *
+         */
+        assertThreePhaseCommit(thirdDeleteTxCohort);
+    }
+
+    @Test
+    @Ignore
+    public void testTransactionConflict() throws InterruptedException, ExecutionException {
+        DOMStoreReadWriteTransaction txOne = domStore.newReadWriteTransaction();
+        DOMStoreReadWriteTransaction txTwo = domStore.newReadWriteTransaction();
+        assertTestContainerWrite(txOne);
+        assertTestContainerWrite(txTwo);
+
+        /**
+         * Commits transaction
+         */
+        assertThreePhaseCommit(txOne.ready());
+
+        /**
+         * Asserts that txTwo could not be commited
+         */
+        assertFalse(txTwo.ready().canCommit().get());
+    }
+
+    private static void assertThreePhaseCommit(final DOMStoreThreePhaseCommitCohort cohort)
+            throws InterruptedException, ExecutionException {
+        assertTrue(cohort.canCommit().get().booleanValue());
+        cohort.preCommit().get();
+        cohort.commit().get();
+    }
+
+    private static Optional<NormalizedNode<?, ?>> assertTestContainerWrite(final DOMStoreReadWriteTransaction writeTx)
+            throws InterruptedException, ExecutionException {
+        /**
+         *
+         * Writes /test in writeTx
+         *
+         */
+        writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+        return assertTestContainerExists(writeTx);
+    }
+
+    /**
+     * Reads /test from readTx Read should return container.
+     */
+    private static Optional<NormalizedNode<?, ?>> assertTestContainerExists(final DOMStoreReadTransaction readTx)
+            throws InterruptedException, ExecutionException {
 
-    ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = readTx.read(TestModel.TEST_PATH);
-    assertTrue(writeTxContainer.get().isPresent());
-    return writeTxContainer.get();
-  }
+        ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = readTx.read(TestModel.TEST_PATH);
+        assertTrue(writeTxContainer.get().isPresent());
+        return writeTxContainer.get();
+    }
 
 }
index 9ef44f6..33789fb 100644 (file)
@@ -16,15 +16,18 @@ import static org.opendaylight.controller.sal.connect.netconf.util.NetconfMessag
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -49,11 +52,12 @@ public final class NetconfDeviceReadOnlyTx implements DOMDataReadOnlyTransaction
         this.id = id;
     }
 
-    public ListenableFuture<Optional<NormalizedNode<?, ?>>> readConfigurationData(final YangInstanceIdentifier path) {
+    private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readConfigurationData(
+            final YangInstanceIdentifier path) {
         final ListenableFuture<RpcResult<CompositeNode>> future = rpc.invokeRpc(NETCONF_GET_CONFIG_QNAME,
                 NetconfMessageTransformUtil.wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path)));
 
-        return Futures.transform(future, new Function<RpcResult<CompositeNode>, Optional<NormalizedNode<?, ?>>>() {
+        ListenableFuture<Optional<NormalizedNode<?, ?>>> transformedFuture = Futures.transform(future, new Function<RpcResult<CompositeNode>, Optional<NormalizedNode<?, ?>>>() {
             @Override
             public Optional<NormalizedNode<?, ?>> apply(final RpcResult<CompositeNode> result) {
                 checkReadSuccess(result, path);
@@ -66,6 +70,8 @@ public final class NetconfDeviceReadOnlyTx implements DOMDataReadOnlyTransaction
                         transform(path, node);
             }
         });
+
+        return MappingCheckedFuture.create(transformedFuture, ReadFailedException.MAPPER);
     }
 
     private void checkReadSuccess(final RpcResult<CompositeNode> result, final YangInstanceIdentifier path) {
@@ -85,10 +91,11 @@ public final class NetconfDeviceReadOnlyTx implements DOMDataReadOnlyTransaction
         }
     }
 
-    public ListenableFuture<Optional<NormalizedNode<?, ?>>> readOperationalData(final YangInstanceIdentifier path) {
+    private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readOperationalData(
+            final YangInstanceIdentifier path) {
         final ListenableFuture<RpcResult<CompositeNode>> future = rpc.invokeRpc(NETCONF_GET_QNAME, NetconfMessageTransformUtil.wrap(NETCONF_GET_QNAME, toFilterStructure(path)));
 
-        return Futures.transform(future, new Function<RpcResult<CompositeNode>, Optional<NormalizedNode<?, ?>>>() {
+        ListenableFuture<Optional<NormalizedNode<?, ?>>> transformedFuture = Futures.transform(future, new Function<RpcResult<CompositeNode>, Optional<NormalizedNode<?, ?>>>() {
             @Override
             public Optional<NormalizedNode<?, ?>> apply(final RpcResult<CompositeNode> result) {
                 checkReadSuccess(result, path);
@@ -101,6 +108,8 @@ public final class NetconfDeviceReadOnlyTx implements DOMDataReadOnlyTransaction
                         transform(path, node);
             }
         });
+
+        return MappingCheckedFuture.create(transformedFuture, ReadFailedException.MAPPER);
     }
 
     private static Node<?> findNode(final CompositeNode node, final YangInstanceIdentifier identifier) {
@@ -136,7 +145,8 @@ public final class NetconfDeviceReadOnlyTx implements DOMDataReadOnlyTransaction
     }
 
     @Override
-    public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
+            final LogicalDatastoreType store, final YangInstanceIdentifier path) {
         final YangInstanceIdentifier legacyPath = toLegacyPath(normalizer, path, id);
 
         switch (store) {
index 4054cf9..3d2c3b9 100644 (file)
@@ -11,8 +11,10 @@ package org.opendaylight.controller.sal.connect.netconf.sal.tx;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
@@ -62,7 +64,8 @@ public class NetconfDeviceReadWriteTx implements DOMDataReadWriteTransaction {
     }
 
     @Override
-    public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
+            final LogicalDatastoreType store, final YangInstanceIdentifier path) {
         return delegateReadTx.read(store, path);
     }
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.