Bug 1534: Changed blocking calls to async in dist data store 65/9965/7
authortpantelis <tpanteli@brocade.com>
Thu, 14 Aug 2014 07:03:47 +0000 (03:03 -0400)
committertpantelis <tpanteli@brocade.com>
Sat, 16 Aug 2014 05:37:09 +0000 (01:37 -0400)
Changed the read and commit methods to use async akka actor calls instead of
submitting tasks to a ListeningExecutorService that do blocking akka
calls. This obviates the need for the ListeningExecutorService.

Change-Id: I7a3f369917431067ad1817a3ab53a358bc21f123
Signed-off-by: tpantelis <tpanteli@brocade.com>
12 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java

index 4fa26ffb2033407efb5b1fc1c86f96d04e4e3dd7..404a4e02033ea1c89f9fada4135cfe0e4b6a935e 100644 (file)
@@ -10,9 +10,8 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
@@ -28,8 +27,6 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.util.PropertyUtils;
-import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -42,37 +39,12 @@ import org.slf4j.LoggerFactory;
  */
 public class DistributedDataStore implements DOMStore, SchemaContextListener, AutoCloseable {
 
-    private static final Logger
-        LOG = LoggerFactory.getLogger(DistributedDataStore.class);
-
-    private static final String EXECUTOR_MAX_POOL_SIZE_PROP =
-            "mdsal.dist-datastore-executor-pool.size";
-    private static final int DEFAULT_EXECUTOR_MAX_POOL_SIZE = 10;
-
-    private static final String EXECUTOR_MAX_QUEUE_SIZE_PROP =
-            "mdsal.dist-datastore-executor-queue.size";
-    private static final int DEFAULT_EXECUTOR_MAX_QUEUE_SIZE = 5000;
+    private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
 
     private final ActorContext actorContext;
 
     private SchemaContext schemaContext;
 
-    /**
-     * Executor used to run FutureTask's
-     *
-     * This is typically used when we need to make a request to an actor and
-     * wait for it's response and the consumer needs to be provided a Future.
-     */
-    private final ListeningExecutorService executor =
-            MoreExecutors.listeningDecorator(
-                    SpecialExecutors.newBlockingBoundedFastThreadPool(
-                            PropertyUtils.getIntSystemProperty(
-                                    EXECUTOR_MAX_POOL_SIZE_PROP,
-                                    DEFAULT_EXECUTOR_MAX_POOL_SIZE),
-                            PropertyUtils.getIntSystemProperty(
-                                    EXECUTOR_MAX_QUEUE_SIZE_PROP,
-                                    DEFAULT_EXECUTOR_MAX_QUEUE_SIZE), "DistDataStore"));
-
     public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
             Configuration configuration, InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
@@ -95,15 +67,16 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
     }
 
 
+    @SuppressWarnings("unchecked")
     @Override
-    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
+    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+                                              ListenerRegistration<L> registerChangeListener(
         YangInstanceIdentifier path, L listener,
         AsyncDataBroker.DataChangeScope scope) {
 
         Preconditions.checkNotNull(path, "path should not be null");
         Preconditions.checkNotNull(listener, "listener should not be null");
 
-
         LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
 
         ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
@@ -112,10 +85,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
         Object result = actorContext.executeLocalShardOperation(shardName,
-            new RegisterChangeListener(path, dataChangeListenerActor.path(),
-                scope),
-            ActorContext.ASK_DURATION
-        );
+            new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+            ActorContext.ASK_DURATION);
 
         if (result != null) {
             RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
@@ -127,34 +98,31 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
         LOG.debug(
             "No local shard for shardName {} was found so returning a noop registration",
             shardName);
+
         return new NoOpDataChangeListenerRegistration(listener);
     }
 
-
-
-
-
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
-        return new TransactionChainProxy(actorContext, executor, schemaContext);
+        return new TransactionChainProxy(actorContext, schemaContext);
     }
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
         return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
-            executor, schemaContext);
+            schemaContext);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
         return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY,
-            executor, schemaContext);
+            schemaContext);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
         return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE,
-            executor, schemaContext);
+            schemaContext);
     }
 
     @Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
index 97bb196f9fc36b9635e906bb654fafc68b2538c4..49c7b7e78f52295e9b3b1fc5b1ae2ea10926def2 100644 (file)
@@ -53,7 +53,7 @@ public class ShardReadWriteTransaction extends ShardTransaction {
     } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
       mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
     } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      deleteData(transaction,DeleteData.fromSerizalizable(message));
+      deleteData(transaction,DeleteData.fromSerializable(message));
     } else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
       readyTransaction(transaction,new ReadyTransaction());
     } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
index 91e578b46d1f6de8f2e841f7d7f459541a9c3bdb..b01fe7d4ac11a4a0eea7cfa28b16f5bea8e7aec5 100644 (file)
@@ -50,7 +50,7 @@ public class ShardWriteTransaction extends ShardTransaction {
     } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
       mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
     } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      deleteData(transaction,DeleteData.fromSerizalizable(message));
+      deleteData(transaction,DeleteData.fromSerializable(message));
     } else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
       readyTransaction(transaction,new ReadyTransaction());
     }else {
index 5b447943ea7fd798e5572e55483ff8b4fcf7e331..fc455b193e27118f6dcfcc2de93032f2676c5619 100644 (file)
@@ -10,11 +10,13 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorPath;
 import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
+import akka.dispatch.OnComplete;
 
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.SettableFuture;
 
-import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -28,124 +30,156 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.concurrent.Future;
+
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Callable;
 
 /**
  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
  */
-public class ThreePhaseCommitCohortProxy implements
-    DOMStoreThreePhaseCommitCohort{
+public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
 
-    private static final Logger
-        LOG = LoggerFactory.getLogger(DistributedDataStore.class);
+    private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
 
     private final ActorContext actorContext;
     private final List<ActorPath> cohortPaths;
-    private final ListeningExecutorService executor;
     private final String transactionId;
 
-
-    public ThreePhaseCommitCohortProxy(ActorContext actorContext,
-        List<ActorPath> cohortPaths,
-        String transactionId,
-        ListeningExecutorService executor) {
-
+    public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths,
+            String transactionId) {
         this.actorContext = actorContext;
         this.cohortPaths = cohortPaths;
         this.transactionId = transactionId;
-        this.executor = executor;
     }
 
-    @Override public ListenableFuture<Boolean> canCommit() {
+    @Override
+    public ListenableFuture<Boolean> canCommit() {
         LOG.debug("txn {} canCommit", transactionId);
-        Callable<Boolean> call = new Callable<Boolean>() {
 
+        Future<Iterable<Object>> combinedFuture =
+                invokeCohorts(new CanCommitTransaction().toSerializable());
+
+        final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+
+        combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
             @Override
-            public Boolean call() throws Exception {
-                for(ActorPath actorPath : cohortPaths){
-
-                    Object message = new CanCommitTransaction().toSerializable();
-                    LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
-
-                    ActorSelection cohort = actorContext.actorSelection(actorPath);
-
-                    try {
-                        Object response =
-                                actorContext.executeRemoteOperation(cohort,
-                                        message,
-                                        ActorContext.ASK_DURATION);
-
-                        if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
-                            CanCommitTransactionReply reply =
-                                    CanCommitTransactionReply.fromSerializable(response);
-                            if (!reply.getCanCommit()) {
-                                return false;
-                            }
+            public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
+                if(failure != null) {
+                    returnFuture.setException(failure);
+                    return;
+                }
+
+                boolean result = true;
+                for(Object response: responses) {
+                    if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
+                        CanCommitTransactionReply reply =
+                                CanCommitTransactionReply.fromSerializable(response);
+                        if (!reply.getCanCommit()) {
+                            result = false;
+                            break;
                         }
-                    } catch(RuntimeException e){
-                        // FIXME : Need to properly handle this
-                        LOG.error("Unexpected Exception", e);
-                        return false;
+                    } else {
+                        LOG.error("Unexpected response type {}", response.getClass());
+                        returnFuture.setException(new IllegalArgumentException(
+                                String.format("Unexpected response type {}", response.getClass())));
+                        return;
                     }
                 }
 
-                return true;
+                returnFuture.set(Boolean.valueOf(result));
             }
-        };
+        }, actorContext.getActorSystem().dispatcher());
+
+        return returnFuture;
+    }
+
+    private Future<Iterable<Object>> invokeCohorts(Object message) {
+        List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
+        for(ActorPath actorPath : cohortPaths) {
+
+            LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
 
-        return executor.submit(call);
+            ActorSelection cohort = actorContext.actorSelection(actorPath);
+
+            futureList.add(actorContext.executeRemoteOperationAsync(cohort, message,
+                    ActorContext.ASK_DURATION));
+        }
+
+        return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
     }
 
-    @Override public ListenableFuture<Void> preCommit() {
+    @Override
+    public ListenableFuture<Void> preCommit() {
         LOG.debug("txn {} preCommit", transactionId);
-        return voidOperation(new PreCommitTransaction().toSerializable(), PreCommitTransactionReply.SERIALIZABLE_CLASS);
+        return voidOperation(new PreCommitTransaction().toSerializable(),
+                PreCommitTransactionReply.SERIALIZABLE_CLASS, true);
     }
 
-    @Override public ListenableFuture<Void> abort() {
+    @Override
+    public ListenableFuture<Void> abort() {
         LOG.debug("txn {} abort", transactionId);
-        return voidOperation(new AbortTransaction().toSerializable(), AbortTransactionReply.SERIALIZABLE_CLASS);
+
+        // Note - we pass false for propagateException. In the front-end data broker, this method
+        // is called when one of the 3 phases fails with an exception. We'd rather have that
+        // original exception propagated to the client. If our abort fails and we propagate the
+        // exception then that exception will supersede and suppress the original exception. But
+        // it's the original exception that is the root cause and of more interest to the client.
+
+        return voidOperation(new AbortTransaction().toSerializable(),
+                AbortTransactionReply.SERIALIZABLE_CLASS, false);
     }
 
-    @Override public ListenableFuture<Void> commit() {
+    @Override
+    public ListenableFuture<Void> commit() {
         LOG.debug("txn {} commit", transactionId);
-        return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS);
+        return voidOperation(new CommitTransaction().toSerializable(),
+                CommitTransactionReply.SERIALIZABLE_CLASS, true);
     }
 
-    private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
-        Callable<Void> call = new Callable<Void>() {
-
-            @Override public Void call() throws Exception {
-                for(ActorPath actorPath : cohortPaths){
-                    ActorSelection cohort = actorContext.actorSelection(actorPath);
-
-                    LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
-
-                    try {
-                        Object response =
-                            actorContext.executeRemoteOperation(cohort,
-                                message,
-                                ActorContext.ASK_DURATION);
-
-                        if (response != null && !response.getClass()
-                            .equals(expectedResponseClass)) {
-                            throw new RuntimeException(
-                                String.format(
-                                    "did not get the expected response \n\t\t expected : %s \n\t\t actual   : %s",
-                                    expectedResponseClass.toString(),
-                                    response.getClass().toString())
-                            );
+    private ListenableFuture<Void> voidOperation(final Object message,
+            final Class<?> expectedResponseClass, final boolean propagateException) {
+
+        Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
+
+        final SettableFuture<Void> returnFuture = SettableFuture.create();
+
+        combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
+            @Override
+            public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
+
+                Throwable exceptionToPropagate = failure;
+                if(exceptionToPropagate == null) {
+                    for(Object response: responses) {
+                        if(!response.getClass().equals(expectedResponseClass)) {
+                            exceptionToPropagate = new IllegalArgumentException(
+                                    String.format("Unexpected response type {}",
+                                            response.getClass()));
+                            break;
                         }
-                    } catch(TimeoutException e){
-                        LOG.error(String.format("A timeout occurred when processing operation : %s", message));
                     }
                 }
-                return null;
+
+                if(exceptionToPropagate != null) {
+                    if(propagateException) {
+                        // We don't log the exception here to avoid redundant logging since we're
+                        // propagating to the caller in MD-SAL core who will log it.
+                        returnFuture.setException(exceptionToPropagate);
+                    } else {
+                        // Since the caller doesn't want us to propagate the exception we'll also
+                        // not log it normally. But it's usually not good to totally silence
+                        // exceptions so we'll log it to debug level.
+                        LOG.debug(String.format("%s failed",  message.getClass().getSimpleName()),
+                                exceptionToPropagate);
+                        returnFuture.set(null);
+                    }
+                } else {
+                    returnFuture.set(null);
+                }
             }
-        };
+        }, actorContext.getActorSystem().dispatcher());
 
-        return executor.submit(call);
+        return returnFuture;
     }
 
     public List<ActorPath> getCohortPaths() {
index 5e9defa5b59c5db64033c1a92fbdb611766152a2..c4ec760b40cd57d54579144f010b5e0f7425363e 100644 (file)
@@ -15,39 +15,34 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
-
 /**
  * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
  */
 public class TransactionChainProxy implements DOMStoreTransactionChain{
     private final ActorContext actorContext;
-    private final ListeningExecutorService transactionExecutor;
     private final SchemaContext schemaContext;
 
-    public TransactionChainProxy(ActorContext actorContext, ListeningExecutorService transactionExecutor,
-            SchemaContext schemaContext) {
+    public TransactionChainProxy(ActorContext actorContext, SchemaContext schemaContext) {
         this.actorContext = actorContext;
-        this.transactionExecutor = transactionExecutor;
         this.schemaContext = schemaContext;
     }
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
         return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, schemaContext);
+            TransactionProxy.TransactionType.READ_ONLY, schemaContext);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
         return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.WRITE_ONLY, transactionExecutor, schemaContext);
+            TransactionProxy.TransactionType.WRITE_ONLY, schemaContext);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
         return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.READ_WRITE, transactionExecutor, schemaContext);
+            TransactionProxy.TransactionType.READ_WRITE, schemaContext);
     }
 
     @Override
index 95862ae9d93670de67dbdd8bd99074a9ba49e8b8..5b5b1296af08e42c2278b6db80ac05cbf08329c6 100644 (file)
@@ -12,13 +12,14 @@ import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
+import akka.dispatch.OnComplete;
+
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
-import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import com.google.common.util.concurrent.SettableFuture;
+
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -44,11 +45,12 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.concurrent.Future;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -80,25 +82,22 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private final ActorContext actorContext;
     private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
     private final TransactionIdentifier identifier;
-    private final ListeningExecutorService executor;
     private final SchemaContext schemaContext;
+    private boolean inReadyState;
 
-    public TransactionProxy(
-        ActorContext actorContext,
-        TransactionType transactionType,
-        ListeningExecutorService executor,
-        SchemaContext schemaContext
-    ) {
+    public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
+            SchemaContext schemaContext) {
         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
         this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
-        this.executor = Preconditions.checkNotNull(executor, "executor should not be null");
         this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
 
         String memberName = actorContext.getCurrentMemberName();
         if(memberName == null){
             memberName = "UNKNOWN-MEMBER";
         }
-        this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(counter.getAndIncrement()).build();
+
+        this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
+                counter.getAndIncrement()).build();
 
         LOG.debug("Created txn {}", identifier);
 
@@ -108,6 +107,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
             final YangInstanceIdentifier path) {
 
+        Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
+                "Read operation on write-only transaction is not allowed");
+
         LOG.debug("txn {} read {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
@@ -115,8 +117,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         return transactionContext(path).readData(path);
     }
 
-    @Override public CheckedFuture<Boolean, ReadFailedException> exists(
-        YangInstanceIdentifier path) {
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
+
+        Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
+                "Exists operation on write-only transaction is not allowed");
+
         LOG.debug("txn {} exists {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
@@ -124,9 +130,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         return transactionContext(path).dataExists(path);
     }
 
+    private void checkModificationState() {
+        Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
+                "Modification operation on read-only transaction is not allowed");
+        Preconditions.checkState(!inReadyState,
+                "Transaction is sealed - further modifications are allowed");
+    }
+
     @Override
     public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
 
+        checkModificationState();
+
         LOG.debug("txn {} write {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
@@ -137,6 +152,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     @Override
     public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
 
+        checkModificationState();
+
         LOG.debug("txn {} merge {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
@@ -147,6 +164,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     @Override
     public void delete(YangInstanceIdentifier path) {
 
+        checkModificationState();
+
         LOG.debug("txn {} delete {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
@@ -156,25 +175,36 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
     @Override
     public DOMStoreThreePhaseCommitCohort ready() {
+
+        checkModificationState();
+
+        inReadyState = true;
+
         List<ActorPath> cohortPaths = new ArrayList<>();
 
-        LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size());
+        LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier,
+                remoteTransactionPaths.size());
 
         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
 
-            LOG.debug("txn {} Readying transaction for shard {}", identifier, transactionContext.getShardName());
+            LOG.debug("txn {} Readying transaction for shard {}", identifier,
+                    transactionContext.getShardName());
 
             Object result = transactionContext.readyTransaction();
 
             if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
-                ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(actorContext.getActorSystem(),result);
-                String resolvedCohortPath = transactionContext
-                    .getResolvedCohortPath(reply.getCohortPath().toString());
+                ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
+                        actorContext.getActorSystem(),result);
+                String resolvedCohortPath = transactionContext.getResolvedCohortPath(
+                        reply.getCohortPath().toString());
                 cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
+            } else {
+                LOG.error("Was expecting {} but got {}", ReadyTransactionReply.SERIALIZABLE_CLASS,
+                        result.getClass());
             }
         }
 
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString(), executor);
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString());
     }
 
     @Override
@@ -213,8 +243,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             Object response = actorContext.executeShardOperation(shardName,
                 new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
                 ActorContext.ASK_DURATION);
-            if (response.getClass()
-                .equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
+            if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
                 CreateTransactionReply reply =
                     CreateTransactionReply.fromSerializable(response);
 
@@ -229,11 +258,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                         transactionActor);
 
                 remoteTransactionPaths.put(shardName, transactionContext);
+            } else {
+                LOG.error("Was expecting {} but got {}", CreateTransactionReply.SERIALIZABLE_CLASS,
+                        response.getClass());
             }
-        } catch(TimeoutException | PrimaryNotFoundException e){
+        } catch(Exception e){
             LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
-            remoteTransactionPaths.put(shardName,
-                new NoOpTransactionContext(shardName));
+            remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e));
         }
     }
 
@@ -272,7 +303,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             this.actor = actor;
         }
 
-        @Override public String getShardName() {
+        @Override
+        public String getShardName() {
             return shardName;
         }
 
@@ -280,96 +312,105 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             return actor;
         }
 
-        @Override public String getResolvedCohortPath(String cohortPath) {
+        @Override
+        public String getResolvedCohortPath(String cohortPath) {
             return actorContext.resolvePath(actorPath, cohortPath);
         }
 
-        @Override public void closeTransaction() {
-            getActor().tell(
-                new CloseTransaction().toSerializable(), null);
+        @Override
+        public void closeTransaction() {
+            actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
         }
 
-        @Override public Object readyTransaction() {
+        @Override
+        public Object readyTransaction() {
             return actorContext.executeRemoteOperation(getActor(),
-                new ReadyTransaction().toSerializable(),
-                ActorContext.ASK_DURATION
-            );
-
+                    new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
         }
 
-        @Override public void deleteData(YangInstanceIdentifier path) {
-            getActor().tell(new DeleteData(path).toSerializable(), null);
+        @Override
+        public void deleteData(YangInstanceIdentifier path) {
+            actorContext.sendRemoteOperationAsync(getActor(), new DeleteData(path).toSerializable() );
         }
 
-        @Override public void mergeData(YangInstanceIdentifier path,
-            NormalizedNode<?, ?> data) {
-            getActor()
-                .tell(new MergeData(path, data, schemaContext).toSerializable(),
-                    null);
+        @Override
+        public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+            actorContext.sendRemoteOperationAsync(getActor(),
+                    new MergeData(path, data, schemaContext).toSerializable());
         }
 
         @Override
         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
             final YangInstanceIdentifier path) {
 
-            Callable<Optional<NormalizedNode<?, ?>>> call =
-                new Callable<Optional<NormalizedNode<?, ?>>>() {
-
-                    @Override public Optional<NormalizedNode<?, ?>> call()
-                        throws Exception {
-                        Object response = actorContext
-                            .executeRemoteOperation(getActor(),
-                                new ReadData(path).toSerializable(),
-                                ActorContext.ASK_DURATION);
-                        if (response.getClass()
-                            .equals(ReadDataReply.SERIALIZABLE_CLASS)) {
-                            ReadDataReply reply = ReadDataReply
-                                .fromSerializable(schemaContext, path,
-                                    response);
+            final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
+
+            OnComplete<Object> onComplete = new OnComplete<Object>() {
+                @Override
+                public void onComplete(Throwable failure, Object response) throws Throwable {
+                    if(failure != null) {
+                        returnFuture.setException(new ReadFailedException(
+                                "Error reading data for path " + path, failure));
+                    } else {
+                        if (response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+                            ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
+                                    path, response);
                             if (reply.getNormalizedNode() == null) {
-                                return Optional.absent();
+                                returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
+                            } else {
+                                returnFuture.set(Optional.<NormalizedNode<?, ?>>of(
+                                        reply.getNormalizedNode()));
                             }
-                            return Optional.<NormalizedNode<?, ?>>of(
-                                reply.getNormalizedNode());
+                        } else {
+                            returnFuture.setException(new ReadFailedException(
+                                    "Invalid response reading data for path " + path));
                         }
-
-                        throw new ReadFailedException("Read Failed " + path);
                     }
-                };
+                }
+            };
 
-            return MappingCheckedFuture
-                .create(executor.submit(call), ReadFailedException.MAPPER);
-        }
+            Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
+                    new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
+            future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
 
-        @Override public void writeData(YangInstanceIdentifier path,
-            NormalizedNode<?, ?> data) {
-            getActor()
-                .tell(new WriteData(path, data, schemaContext).toSerializable(),
-                    null);
+            return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
         }
 
-        @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
-            final YangInstanceIdentifier path) {
-
-            Callable<Boolean> call = new Callable<Boolean>() {
-
-                @Override public Boolean call() throws Exception {
-                    Object o = actorContext.executeRemoteOperation(getActor(),
-                        new DataExists(path).toSerializable(),
-                        ActorContext.ASK_DURATION
-                    );
-
+        @Override
+        public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+            actorContext.sendRemoteOperationAsync(getActor(),
+                    new WriteData(path, data, schemaContext).toSerializable());
+        }
 
-                    if (DataExistsReply.SERIALIZABLE_CLASS
-                        .equals(o.getClass())) {
-                        return DataExistsReply.fromSerializable(o).exists();
+        @Override
+        public CheckedFuture<Boolean, ReadFailedException> dataExists(
+                final YangInstanceIdentifier path) {
+
+            final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+
+            OnComplete<Object> onComplete = new OnComplete<Object>() {
+                @Override
+                public void onComplete(Throwable failure, Object response) throws Throwable {
+                    if(failure != null) {
+                        returnFuture.setException(new ReadFailedException(
+                                "Error checking exists for path " + path, failure));
+                    } else {
+                        if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
+                            returnFuture.set(Boolean.valueOf(DataExistsReply.
+                                        fromSerializable(response).exists()));
+                        } else {
+                            returnFuture.setException(new ReadFailedException(
+                                    "Invalid response checking exists for path " + path));
+                        }
                     }
-
-                    throw new ReadFailedException("Exists Failed " + path);
                 }
             };
-            return MappingCheckedFuture
-                .create(executor.submit(call), ReadFailedException.MAPPER);
+
+            Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
+                    new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
+            future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+
+            return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
         }
     }
 
@@ -379,22 +420,28 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
 
         private final String shardName;
+        private final Exception failure;
 
         private ActorRef cohort;
 
-        public NoOpTransactionContext(String shardName){
+        public NoOpTransactionContext(String shardName, Exception failure){
             this.shardName = shardName;
+            this.failure = failure;
         }
-        @Override public String getShardName() {
+
+        @Override
+        public String getShardName() {
             return  shardName;
 
         }
 
-        @Override public String getResolvedCohortPath(String cohortPath) {
+        @Override
+        public String getResolvedCohortPath(String cohortPath) {
             return cohort.path().toString();
         }
 
-        @Override public void closeTransaction() {
+        @Override
+        public void closeTransaction() {
             LOG.warn("txn {} closeTransaction called", identifier);
         }
 
@@ -404,11 +451,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             return new ReadyTransactionReply(cohort.path()).toSerializable();
         }
 
-        @Override public void deleteData(YangInstanceIdentifier path) {
+        @Override
+        public void deleteData(YangInstanceIdentifier path) {
             LOG.warn("txt {} deleteData called path = {}", identifier, path);
         }
 
-        @Override public void mergeData(YangInstanceIdentifier path,
+        @Override
+        public void mergeData(YangInstanceIdentifier path,
             NormalizedNode<?, ?> data) {
             LOG.warn("txn {} mergeData called path = {}", identifier, path);
         }
@@ -417,8 +466,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
             YangInstanceIdentifier path) {
             LOG.warn("txn {} readData called path = {}", identifier, path);
-            return Futures.immediateCheckedFuture(
-                Optional.<NormalizedNode<?, ?>>absent());
+            return Futures.immediateFailedCheckedFuture(new ReadFailedException(
+                    "Error reading data for path " + path, failure));
         }
 
         @Override public void writeData(YangInstanceIdentifier path,
@@ -429,10 +478,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
             YangInstanceIdentifier path) {
             LOG.warn("txn {} dataExists called path = {}", identifier, path);
-
-            // Returning false instead of an exception to keep this aligned with
-            // read
-            return Futures.immediateCheckedFuture(false);
+            return Futures.immediateFailedCheckedFuture(new ReadFailedException(
+                    "Error checking exists for path " + path, failure));
         }
     }
 
index 17861a5a68b0e7da27f7b7996ddb2c87cdcd22ad..9ae851e76c0f4db04776d14e7cfeab1ab20fefaa 100644 (file)
@@ -31,7 +31,7 @@ public class DeleteData implements SerializableMessage {
             .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(path)).build();
     }
 
-    public static DeleteData fromSerizalizable(Object serializable){
+    public static DeleteData fromSerializable(Object serializable){
         ShardTransactionMessages.DeleteData o = (ShardTransactionMessages.DeleteData) serializable;
         return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
     }
index 4706c66e2594eae1384b465bf5d0b246c72d8223..e12a9663d1fca8a969c25166f38986ea19eab51a 100644 (file)
@@ -14,6 +14,7 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.util.Timeout;
+
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
@@ -22,9 +23,9 @@ import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
@@ -54,8 +55,6 @@ public class ActorContext {
     private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
 
-    private SchemaContext schemaContext = null;
-
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
         ClusterWrapper clusterWrapper,
         Configuration configuration) {
@@ -174,6 +173,33 @@ public class ActorContext {
         }
     }
 
+    /**
+     * Execute an operation on a remote actor asynchronously.
+     *
+     * @param actor the ActorSelection
+     * @param message the message to send
+     * @param duration the maximum amount of time to send he message
+     * @return a Future containing the eventual result
+     */
+    public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message,
+            FiniteDuration duration) {
+
+        LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
+
+        return ask(actor, message, new Timeout(duration));
+    }
+
+    /**
+     * Sends an operation to be executed by a remote actor asynchronously without waiting for a
+     * reply (essentially set and forget).
+     *
+     * @param actor the ActorSelection
+     * @param message the message to send
+     */
+    public void sendRemoteOperationAsync(ActorSelection actor, Object message) {
+        actor.tell(message, ActorRef.noSender());
+    }
+
     /**
      * Execute an operation on the primary for a given shard
      * <p>
index 4eca5671f626543b5c9ca8052f1427945c72a302..87231f08849ed02398472bb792a40a48753a96be 100644 (file)
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorRef;
+import akka.actor.ActorPath;
+import akka.actor.ActorSelection;
 import akka.actor.Props;
+import akka.dispatch.Futures;
 
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import junit.framework.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
 
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.stubbing.Stubber;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
-import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
+import scala.concurrent.duration.FiniteDuration;
 
-import java.util.Arrays;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertNotNull;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
 
 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
-    private ThreePhaseCommitCohortProxy proxy;
-    private Props props;
-    private ActorRef actorRef;
-    private MockActorContext actorContext;
-    private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
-                        Executors.newSingleThreadExecutor());
+    @Mock
+    private ActorContext actorContext;
 
     @Before
-    public void setUp(){
-        props = Props.create(MessageCollectorActor.class);
-        actorRef = getSystem().actorOf(props);
-        actorContext = new MockActorContext(this.getSystem());
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
 
-        proxy =
-            new ThreePhaseCommitCohortProxy(actorContext,
-                Arrays.asList(actorRef.path()), "txn-1", executor);
+        doReturn(getSystem()).when(actorContext).getActorSystem();
+    }
 
+    private ThreePhaseCommitCohortProxy setupProxy(int nCohorts) {
+        List<ActorPath> cohorts = Lists.newArrayList();
+        for(int i = 1; i <= nCohorts; i++) {
+            ActorPath path = getSystem().actorOf(Props.create(MessageCollectorActor.class)).path();
+            cohorts.add(path);
+            doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
+        }
+
+        return new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
     }
 
-    @After
-    public void tearDown() {
-        executor.shutdownNow();
+    private void setupMockActorContext(Class<?> requestType, Object... responses) {
+        Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
+                .failed((Throwable) responses[0]) : Futures
+                .successful(((SerializableMessage) responses[0]).toSerializable()));
+
+        for(int i = 1; i < responses.length; i++) {
+            stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
+                    .failed((Throwable) responses[i]) : Futures
+                    .successful(((SerializableMessage) responses[i]).toSerializable()));
+        }
+
+        stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class),
+                isA(requestType), any(FiniteDuration.class));
+    }
+
+    private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
+        verify(actorContext, times(nCohorts)).executeRemoteOperationAsync(
+                any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
+    }
+
+    @Test
+    public void testCanCommitWithOneCohort() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+                new CanCommitTransactionReply(true));
+
+        ListenableFuture<Boolean> future = proxy.canCommit();
+
+        assertEquals("canCommit", true, future.get());
+
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+                new CanCommitTransactionReply(false));
+
+        future = proxy.canCommit();
+
+        assertEquals("canCommit", false, future.get());
+
+        verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
     }
 
     @Test
-    public void testCanCommit() throws Exception {
-        actorContext.setExecuteRemoteOperationResponse(new CanCommitTransactionReply(true).toSerializable());
+    public void testCanCommitWithMultipleCohorts() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(2);
+
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+                new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
-        Assert.assertTrue(future.get().booleanValue());
+        assertEquals("canCommit", true, future.get());
 
+        verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
+    }
+
+    @Test
+    public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(3);
+
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+                new CanCommitTransactionReply(true), new CanCommitTransactionReply(false),
+                new CanCommitTransactionReply(true));
+
+        ListenableFuture<Boolean> future = proxy.canCommit();
+
+        assertEquals("canCommit", false, future.get());
+
+        verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
+    }
+
+    @Test(expected = ExecutionException.class)
+    public void testCanCommitWithExceptionFailure() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
+
+        proxy.canCommit().get();
+    }
+
+    @Test(expected = ExecutionException.class)
+    public void testCanCommitWithInvalidResponseType() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+                new PreCommitTransactionReply());
+
+        proxy.canCommit().get();
     }
 
     @Test
     public void testPreCommit() throws Exception {
-        actorContext.setExecuteRemoteOperationResponse(new PreCommitTransactionReply().toSerializable());
+        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
-        ListenableFuture<Void> future = proxy.preCommit();
+        setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
+                new PreCommitTransactionReply());
 
-        future.get();
+        proxy.preCommit().get();
 
+        verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS);
+    }
+
+    @Test(expected = ExecutionException.class)
+    public void testPreCommitWithFailure() throws Exception {
+        ThreePhaseCommitCohortProxy proxy = setupProxy(2);
+
+        setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
+                new PreCommitTransactionReply(), new RuntimeException("mock"));
+
+        proxy.preCommit().get();
     }
 
     @Test
     public void testAbort() throws Exception {
-        actorContext.setExecuteRemoteOperationResponse(new AbortTransactionReply().toSerializable());
+        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
-        ListenableFuture<Void> future = proxy.abort();
+        setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
 
-        future.get();
+        proxy.abort().get();
 
+        verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
+    }
+
+    @Test
+    public void testAbortWithFailure() throws Exception {
+        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+        setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
+
+        // The exception should not get propagated.
+        proxy.abort().get();
+
+        verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
     }
 
     @Test
     public void testCommit() throws Exception {
-        actorContext.setExecuteRemoteOperationResponse(new CommitTransactionReply().toSerializable());
 
-        ListenableFuture<Void> future = proxy.commit();
+        ThreePhaseCommitCohortProxy proxy = setupProxy(2);
+
+        setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
+                new CommitTransactionReply());
+
+        proxy.commit().get();
+
+        verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+    }
+
+    @Test(expected = ExecutionException.class)
+    public void testCommitWithFailure() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(2);
 
-        future.get();
+        setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
+                new RuntimeException("mock"));
+
+        proxy.commit().get();
+    }
+
+    @Test(expected = ExecutionException.class)
+    public void teseCommitWithInvalidResponseType() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+        setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
+
+        proxy.commit().get();
     }
 
     @Test
-    public void testGetCohortPaths() throws Exception {
-        assertNotNull(proxy.getCohortPaths());
+    public void testGetCohortPaths() {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(2);
+
+        List<ActorPath> paths = proxy.getCohortPaths();
+        assertNotNull("getCohortPaths returned null", paths);
+        assertEquals("getCohortPaths size", 2, paths.size());
     }
 }
index 62052f38ab89b6962dc31622332f85113492c924..14696f786e7e36888be3b5517c14ae4b9779340f 100644 (file)
@@ -1,32 +1,44 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import akka.actor.ActorPath;
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
 import akka.actor.Props;
+import akka.dispatch.Futures;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import junit.framework.Assert;
-import org.junit.After;
+
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
-import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -34,377 +46,433 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.List;
-import java.util.concurrent.Executors;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 
-import static junit.framework.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.isA;
+
+@SuppressWarnings("resource")
 public class TransactionProxyTest extends AbstractActorTest {
 
+    @SuppressWarnings("serial")
+    static class TestException extends RuntimeException {
+    }
+
+    static interface Invoker {
+        void invoke(TransactionProxy proxy) throws Exception;
+    }
+
     private final Configuration configuration = new MockConfiguration();
 
-    private final ActorContext testContext =
-        new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration );
+    @Mock
+    private ActorContext mockActorContext;
 
-    private final ListeningExecutorService transactionExecutor =
-        MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+    private SchemaContext schemaContext;
+
+    String memberName = "mock-member";
 
     @Before
     public void setUp(){
-        ShardStrategyFactory.setConfiguration(configuration);
-    }
+        MockitoAnnotations.initMocks(this);
 
-    @After
-    public void tearDown() {
-        transactionExecutor.shutdownNow();
-    }
+        schemaContext = TestModel.createTestContext();
 
-    @Test
-    public void testRead() throws Exception {
-        final Props props = Props.create(DoNothingActor.class);
-        final ActorRef actorRef = getSystem().actorOf(props);
+        doReturn(getSystem()).when(mockActorContext).getActorSystem();
 
-        final MockActorContext actorContext = new MockActorContext(this.getSystem());
-        actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
-        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
-        actorContext.setExecuteRemoteOperationResponse("message");
+        ShardStrategyFactory.setConfiguration(configuration);
+    }
 
+    private CreateTransaction eqCreateTransaction(final String memberName,
+            final TransactionType type) {
+        ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
+            @Override
+            public boolean matches(Object argument) {
+                CreateTransaction obj = CreateTransaction.fromSerializable(argument);
+                return obj.getTransactionId().startsWith(memberName) &&
+                       obj.getTransactionType() == type.ordinal();
+            }
+        };
+
+        return argThat(matcher);
+    }
 
-        TransactionProxy transactionProxy =
-            new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+    private DataExists eqDataExists() {
+        ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
+            @Override
+            public boolean matches(Object argument) {
+                DataExists obj = DataExists.fromSerializable(argument);
+                return obj.getPath().equals(TestModel.TEST_PATH);
+            }
+        };
 
+        return argThat(matcher);
+    }
 
-        actorContext.setExecuteRemoteOperationResponse(
-            new ReadDataReply(TestModel.createTestContext(), null)
-                .toSerializable());
+    private ReadData eqReadData() {
+        ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
+            @Override
+            public boolean matches(Object argument) {
+                ReadData obj = ReadData.fromSerializable(argument);
+                return obj.getPath().equals(TestModel.TEST_PATH);
+            }
+        };
 
-        ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
-            transactionProxy.read(TestModel.TEST_PATH);
+        return argThat(matcher);
+    }
 
-        Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
+    private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
+        ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
+            @Override
+            public boolean matches(Object argument) {
+                WriteData obj = WriteData.fromSerializable(argument, schemaContext);
+                return obj.getPath().equals(TestModel.TEST_PATH) &&
+                       obj.getData().equals(nodeToWrite);
+            }
+        };
+
+        return argThat(matcher);
+    }
 
-        Assert.assertFalse(normalizedNodeOptional.isPresent());
+    private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
+        ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
+            @Override
+            public boolean matches(Object argument) {
+                MergeData obj = MergeData.fromSerializable(argument, schemaContext);
+                return obj.getPath().equals(TestModel.TEST_PATH) &&
+                       obj.getData().equals(nodeToWrite);
+            }
+        };
+
+        return argThat(matcher);
+    }
 
-        actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
-            TestModel.createTestContext(),ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable());
+    private DeleteData eqDeleteData() {
+        ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
+            @Override
+            public boolean matches(Object argument) {
+                DeleteData obj = DeleteData.fromSerializable(argument);
+                return obj.getPath().equals(TestModel.TEST_PATH);
+            }
+        };
 
-        read = transactionProxy.read(TestModel.TEST_PATH);
+        return argThat(matcher);
+    }
 
-        normalizedNodeOptional = read.get();
+    private Object readyTxReply(ActorPath path) {
+        return new ReadyTransactionReply(path).toSerializable();
+    }
 
-        Assert.assertTrue(normalizedNodeOptional.isPresent());
+    private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
+        return Futures.successful(new ReadDataReply(schemaContext, data)
+                .toSerializable());
     }
 
-    @Test
-    public void testExists() throws Exception {
-        final Props props = Props.create(DoNothingActor.class);
-        final ActorRef actorRef = getSystem().actorOf(props);
+    private Future<Object> dataExistsReply(boolean exists) {
+        return Futures.successful(new DataExistsReply(exists).toSerializable());
+    }
 
-        final MockActorContext actorContext = new MockActorContext(this.getSystem());
-        actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
-        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
-        actorContext.setExecuteRemoteOperationResponse("message");
+    private ActorSelection actorSelection(ActorRef actorRef) {
+        return getSystem().actorSelection(actorRef.path());
+    }
 
+    private FiniteDuration anyDuration() {
+        return any(FiniteDuration.class);
+    }
 
-        TransactionProxy transactionProxy =
-            new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+    private CreateTransactionReply createTransactionReply(ActorRef actorRef){
+        return CreateTransactionReply.newBuilder()
+            .setTransactionActorPath(actorRef.path().toString())
+            .setTransactionId("txn-1").build();
+    }
 
+    private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) {
+        ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
+        doReturn(getSystem().actorSelection(actorRef.path())).
+                when(mockActorContext).actorSelection(actorRef.path().toString());
+        doReturn(memberName).when(mockActorContext).getCurrentMemberName();
+        doReturn(createTransactionReply(actorRef)).when(mockActorContext).
+                executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
+                        eqCreateTransaction(memberName, type), anyDuration());
+        doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
+                anyString(), eq(actorRef.path().toString()));
+        doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
+
+        return actorRef;
+    }
 
-        actorContext.setExecuteRemoteOperationResponse(new DataExistsReply(false).toSerializable());
+    @Test
+    public void testRead() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
-        CheckedFuture<Boolean, ReadFailedException> exists =
-            transactionProxy.exists(TestModel.TEST_PATH);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_ONLY, schemaContext);
 
-        Assert.assertFalse(exists.checkedGet());
+        doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
 
-        actorContext.setExecuteRemoteOperationResponse(new DataExistsReply(true).toSerializable());
+        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
+                TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
 
-        exists = transactionProxy.exists(TestModel.TEST_PATH);
+        assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
 
-        Assert.assertTrue(exists.checkedGet());
+        NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        actorContext.setExecuteRemoteOperationResponse("bad message");
+        doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
 
-        exists = transactionProxy.exists(TestModel.TEST_PATH);
+        readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
 
-        try {
-            exists.checkedGet();
-            fail();
-        } catch(ReadFailedException e){
-        }
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
 
+        assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
     }
 
     @Test(expected = ReadFailedException.class)
     public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception {
-        final Props props = Props.create(DoNothingActor.class);
-        final ActorRef actorRef = getSystem().actorOf(props);
-
-        final MockActorContext actorContext = new MockActorContext(this.getSystem());
-        actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
-        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
-        actorContext.setExecuteRemoteOperationResponse("message");
-
-        TransactionProxy transactionProxy =
-            new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+        setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
+        doReturn(Futures.successful(new Object())).when(mockActorContext).
+                executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
 
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_ONLY, schemaContext);
 
-        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>
-            read = transactionProxy.read(TestModel.TEST_PATH);
-
-        read.checkedGet();
+        transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
     }
 
-    @Test
-    public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
-        final ActorContext actorContext = mock(ActorContext.class);
-
-        when(actorContext.executeShardOperation(anyString(), any(), any(
-            FiniteDuration.class))).thenThrow(new PrimaryNotFoundException("test"));
+    @Test(expected = TestException.class)
+    public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
+        setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
-        TransactionProxy transactionProxy =
-            new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+        doThrow(new TestException()).when(mockActorContext).
+                executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
 
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_ONLY, schemaContext);
 
-        ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
-            transactionProxy.read(TestModel.TEST_PATH);
-
-        Assert.assertFalse(read.get().isPresent());
-
+        try {
+            transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+            fail("Expected ReadFailedException");
+        } catch(ReadFailedException e) {
+            // Expected - throw cause - expects TestException.
+            throw e.getCause();
+        }
     }
 
+    private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
+            throws Throwable {
 
-    @Test
-    public void testReadWhenATimeoutExceptionIsThrown() throws Exception {
-        final ActorContext actorContext = mock(ActorContext.class);
+        doThrow(exToThrow).when(mockActorContext).executeShardOperation(
+                anyString(), any(), anyDuration());
 
-        when(actorContext.executeShardOperation(anyString(), any(), any(
-            FiniteDuration.class))).thenThrow(new TimeoutException("test", new Exception("reason")));
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_ONLY, schemaContext);
 
-        TransactionProxy transactionProxy =
-            new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+        try {
+            invoker.invoke(transactionProxy);
+            fail("Expected ReadFailedException");
+        } catch(ReadFailedException e) {
+            // Expected - throw cause - expects TestException.
+            throw e.getCause();
+        }
+    }
 
+    private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
+        testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
+            @Override
+            public void invoke(TransactionProxy proxy) throws Exception {
+                proxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+            }
+        });
+    }
 
-        ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
-            transactionProxy.read(TestModel.TEST_PATH);
+    @Test(expected = PrimaryNotFoundException.class)
+    public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
+        testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
+    }
 
-        Assert.assertFalse(read.get().isPresent());
+    @Test(expected = TimeoutException.class)
+    public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
+        testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
+                new Exception("reason")));
+    }
 
+    @Test(expected = TestException.class)
+    public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
+        testReadWithExceptionOnInitialCreateTransaction(new TestException());
     }
 
     @Test
-    public void testReadWhenAAnyOtherExceptionIsThrown() throws Exception {
-        final ActorContext actorContext = mock(ActorContext.class);
-
-        when(actorContext.executeShardOperation(anyString(), any(), any(
-            FiniteDuration.class))).thenThrow(new NullPointerException());
-
-        TransactionProxy transactionProxy =
-            new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+    public void testExists() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_ONLY, schemaContext);
 
-        try {
-            ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
-                transactionProxy.read(TestModel.TEST_PATH);
-            fail("A null pointer exception was expected");
-        } catch(NullPointerException e){
+        doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
 
-        }
-    }
+        Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
+        assertEquals("Exists response", false, exists);
 
+        doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
 
-    @Test
-    public void testWrite() throws Exception {
-        final Props props = Props.create(MessageCollectorActor.class);
-        final ActorRef actorRef = getSystem().actorOf(props);
+        exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
-        final MockActorContext actorContext = new MockActorContext(this.getSystem());
-        actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
-        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
-        actorContext.setExecuteRemoteOperationResponse("message");
+        assertEquals("Exists response", true, exists);
+    }
 
-        TransactionProxy transactionProxy =
-            new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+    @Test(expected = PrimaryNotFoundException.class)
+    public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
+        testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
+            @Override
+            public void invoke(TransactionProxy proxy) throws Exception {
+                proxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+            }
+        });
+    }
 
-        transactionProxy.write(TestModel.TEST_PATH,
-            ImmutableNodes.containerNode(TestModel.NAME_QNAME));
+    @Test(expected = ReadFailedException.class)
+    public void testExistsWhenAnInvalidMessageIsSentInReply() throws Exception {
+        setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
-        Object messages = testContext
-            .executeLocalOperation(actorRef, "messages",
-                ActorContext.ASK_DURATION);
+        doReturn(Futures.successful(new Object())).when(mockActorContext).
+                executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
 
-        Assert.assertNotNull(messages);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_ONLY, schemaContext);
 
-        Assert.assertTrue(messages instanceof List);
+        transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+    }
 
-        List<Object> listMessages = (List<Object>) messages;
+    @Test(expected = TestException.class)
+    public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
+        setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
-        Assert.assertEquals(1, listMessages.size());
+        doThrow(new TestException()).when(mockActorContext).
+                executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
 
-        Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
-    }
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_ONLY, schemaContext);
 
-    private Object createPrimaryFound(ActorRef actorRef) {
-        return new PrimaryFound(actorRef.path().toString()).toSerializable();
+        try {
+            transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+            fail("Expected ReadFailedException");
+        } catch(ReadFailedException e) {
+            // Expected - throw cause - expects TestException.
+            throw e.getCause();
+        }
     }
 
     @Test
-    public void testMerge() throws Exception {
-        final Props props = Props.create(MessageCollectorActor.class);
-        final ActorRef actorRef = getSystem().actorOf(props);
+    public void testWrite() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
 
-        final MockActorContext actorContext = new MockActorContext(this.getSystem());
-        actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
-        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
-        actorContext.setExecuteRemoteOperationResponse("message");
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                WRITE_ONLY, schemaContext);
 
-        TransactionProxy transactionProxy =
-            new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        transactionProxy.merge(TestModel.TEST_PATH,
-            ImmutableNodes.containerNode(TestModel.NAME_QNAME));
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
-        Object messages = testContext
-            .executeLocalOperation(actorRef, "messages",
-                ActorContext.ASK_DURATION);
+        verify(mockActorContext).sendRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+    }
 
-        Assert.assertNotNull(messages);
+    @Test
+    public void testMerge() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
 
-        Assert.assertTrue(messages instanceof List);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                WRITE_ONLY, schemaContext);
 
-        List<Object> listMessages = (List<Object>) messages;
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        Assert.assertEquals(1, listMessages.size());
+        transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
-        Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
+        verify(mockActorContext).sendRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
     }
 
     @Test
     public void testDelete() throws Exception {
-        final Props props = Props.create(MessageCollectorActor.class);
-        final ActorRef actorRef = getSystem().actorOf(props);
-
-        final MockActorContext actorContext = new MockActorContext(this.getSystem());
-        actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
-        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
-        actorContext.setExecuteRemoteOperationResponse("message");
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
 
-        TransactionProxy transactionProxy =
-            new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                WRITE_ONLY, schemaContext);
 
         transactionProxy.delete(TestModel.TEST_PATH);
 
-        Object messages = testContext
-            .executeLocalOperation(actorRef, "messages",
-                ActorContext.ASK_DURATION);
-
-        Assert.assertNotNull(messages);
-
-        Assert.assertTrue(messages instanceof List);
-
-        List<Object> listMessages = (List<Object>) messages;
-
-        Assert.assertEquals(1, listMessages.size());
-
-        Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
+        verify(mockActorContext).sendRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqDeleteData());
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testReady() throws Exception {
-        final Props props = Props.create(DoNothingActor.class);
-        final ActorRef doNothingActorRef = getSystem().actorOf(props);
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
 
-        final MockActorContext actorContext = new MockActorContext(this.getSystem());
-        actorContext.setExecuteLocalOperationResponse(createPrimaryFound(doNothingActorRef));
-        actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
-        actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()).toSerializable());
+        doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
 
-        TransactionProxy transactionProxy =
-            new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+        doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperation(
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
 
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_WRITE, schemaContext);
 
         transactionProxy.read(TestModel.TEST_PATH);
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
-        Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0);
-
+        assertEquals("getCohortPaths", Arrays.asList(actorRef.path()), proxy.getCohortPaths());
     }
 
     @Test
-    public void testGetIdentifier(){
-        final Props props = Props.create(DoNothingActor.class);
-        final ActorRef doNothingActorRef = getSystem().actorOf(props);
-
-        final MockActorContext actorContext = new MockActorContext(this.getSystem());
-        actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
-
-        TransactionProxy transactionProxy =
-            new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
-
-        Assert.assertNotNull(transactionProxy.getIdentifier());
+    public void testGetIdentifier() {
+        setupActorContextWithInitialCreateTransaction(READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                TransactionProxy.TransactionType.READ_ONLY, schemaContext);
+
+        Object id = transactionProxy.getIdentifier();
+        assertNotNull("getIdentifier returned null", id);
+        assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
     }
 
+    @SuppressWarnings("unchecked")
     @Test
-    public void testClose(){
-        final Props props = Props.create(MessageCollectorActor.class);
-        final ActorRef actorRef = getSystem().actorOf(props);
+    public void testClose() throws Exception{
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
 
-        final MockActorContext actorContext = new MockActorContext(this.getSystem());
-        actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
-        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
-        actorContext.setExecuteRemoteOperationResponse("message");
+        doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
 
-        TransactionProxy transactionProxy =
-            new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_WRITE, schemaContext);
 
         transactionProxy.read(TestModel.TEST_PATH);
 
         transactionProxy.close();
 
-        Object messages = testContext
-            .executeLocalOperation(actorRef, "messages",
-                ActorContext.ASK_DURATION);
-
-        Assert.assertNotNull(messages);
-
-        Assert.assertTrue(messages instanceof List);
-
-        List<Object> listMessages = (List<Object>) messages;
-
-        Assert.assertEquals(1, listMessages.size());
-
-        Assert.assertTrue(listMessages.get(0).getClass().equals(CloseTransaction.SERIALIZABLE_CLASS));
-    }
-
-    private CreateTransactionReply createTransactionReply(ActorRef actorRef){
-        return CreateTransactionReply.newBuilder()
-            .setTransactionActorPath(actorRef.path().toString())
-            .setTransactionId("txn-1")
-            .build();
+        verify(mockActorContext).sendRemoteOperationAsync(
+                eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
     }
 }
index 5874eccda40f4f2d08f6b829757206213a743696..fda9ccdfdbbd44dda363c92b842d246e389466bb 100644 (file)
@@ -1,11 +1,14 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
+import java.util.concurrent.TimeUnit;
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
+
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
@@ -14,6 +17,9 @@ import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.mock;
@@ -74,14 +80,23 @@ public class ActorContextTest extends AbstractActorTest{
         }
 
         private static Props props(final boolean found, final ActorRef actorRef){
-            return Props.create(new Creator<MockShardManager>() {
+            return Props.create(new MockShardManagerCreator(found, actorRef) );
+        }
 
-                @Override public MockShardManager create()
-                    throws Exception {
-                    return new MockShardManager(found,
-                        actorRef);
-                }
-            });
+        @SuppressWarnings("serial")
+        private static class MockShardManagerCreator implements Creator<MockShardManager> {
+            final boolean found;
+            final ActorRef actorRef;
+
+            MockShardManagerCreator(boolean found, ActorRef actorRef) {
+                this.found = found;
+                this.actorRef = actorRef;
+            }
+
+            @Override
+            public MockShardManager create() throws Exception {
+                return new MockShardManager(found, actorRef);
+            }
         }
     }
 
@@ -90,6 +105,7 @@ public class ActorContextTest extends AbstractActorTest{
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
@@ -118,6 +134,7 @@ public class ActorContextTest extends AbstractActorTest{
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef shardManagerActorRef = getSystem()
@@ -145,6 +162,7 @@ public class ActorContextTest extends AbstractActorTest{
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
@@ -173,6 +191,7 @@ public class ActorContextTest extends AbstractActorTest{
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef shardManagerActorRef = getSystem()
@@ -193,4 +212,68 @@ public class ActorContextTest extends AbstractActorTest{
         }};
 
     }
+
+    @Test
+    public void testExecuteRemoteOperation() {
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("3 seconds")) {
+                @Override
+                protected void run() {
+
+                    ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+
+                    ActorRef shardManagerActorRef = getSystem()
+                        .actorOf(MockShardManager.props(true, shardActorRef));
+
+                    ActorContext actorContext =
+                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+                            mock(Configuration.class));
+
+                    ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+
+                    Object out = actorContext.executeRemoteOperation(actor, "hello", duration("3 seconds"));
+
+                    assertEquals("hello", out);
+
+                    expectNoMsg();
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testExecuteRemoteOperationAsync() {
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("3 seconds")) {
+                @Override
+                protected void run() {
+
+                    ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+
+                    ActorRef shardManagerActorRef = getSystem()
+                        .actorOf(MockShardManager.props(true, shardActorRef));
+
+                    ActorContext actorContext =
+                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+                            mock(Configuration.class));
+
+                    ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+
+                    Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello",
+                            Duration.create(3, TimeUnit.SECONDS));
+
+                    try {
+                        Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
+                        assertEquals("Result", "hello", result);
+                    } catch(Exception e) {
+                        throw new AssertionError(e);
+                    }
+
+                    expectNoMsg();
+                }
+            };
+        }};
+    }
 }
index 5d3853f311880d791fc177c9a1082f026fb1c5de..b19fd3a5290b2c8df92917403d7a501bcf79bfcf 100644 (file)
@@ -8,7 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore.utils;
 
-
+import static org.junit.Assert.assertNotNull;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
@@ -16,10 +16,12 @@ import scala.concurrent.duration.FiniteDuration;
 
 public class MockActorContext extends ActorContext {
 
-    private Object executeShardOperationResponse;
-    private Object executeRemoteOperationResponse;
-    private Object executeLocalOperationResponse;
-    private Object executeLocalShardOperationResponse;
+    private volatile Object executeShardOperationResponse;
+    private volatile Object executeRemoteOperationResponse;
+    private volatile Object executeLocalOperationResponse;
+    private volatile Object executeLocalShardOperationResponse;
+    private volatile Exception executeRemoteOperationFailure;
+    private volatile Object inputMessage;
 
     public MockActorContext(ActorSystem actorSystem) {
         super(actorSystem, null, new MockClusterWrapper(), new MockConfiguration());
@@ -52,6 +54,10 @@ public class MockActorContext extends ActorContext {
         executeRemoteOperationResponse = response;
     }
 
+    public void setExecuteRemoteOperationFailure(Exception executeRemoteOperationFailure) {
+        this.executeRemoteOperationFailure = executeRemoteOperationFailure;
+    }
+
     public void setExecuteLocalOperationResponse(
         Object executeLocalOperationResponse) {
         this.executeLocalOperationResponse = executeLocalOperationResponse;
@@ -62,12 +68,20 @@ public class MockActorContext extends ActorContext {
         this.executeLocalShardOperationResponse = executeLocalShardOperationResponse;
     }
 
-    @Override public Object executeLocalOperation(ActorRef actor,
+    @SuppressWarnings("unchecked")
+    public <T> T getInputMessage(Class<T> expType) throws Exception {
+        assertNotNull("Input message was null", inputMessage);
+        return (T) expType.getMethod("fromSerializable", Object.class).invoke(null, inputMessage);
+    }
+
+    @Override
+    public Object executeLocalOperation(ActorRef actor,
         Object message, FiniteDuration duration) {
         return this.executeLocalOperationResponse;
     }
 
-    @Override public Object executeLocalShardOperation(String shardName,
+    @Override
+    public Object executeLocalShardOperation(String shardName,
         Object message, FiniteDuration duration) {
         return this.executeLocalShardOperationResponse;
     }