From 3145f4149ff58cd393819e24454a08b384122bb2 Mon Sep 17 00:00:00 2001 From: tpantelis Date: Thu, 14 Aug 2014 03:03:47 -0400 Subject: [PATCH] Bug 1534: Changed blocking calls to async in dist data store Changed the read and commit methods to use async akka actor calls instead of submitting tasks to a ListeningExecutorService that do blocking akka calls. This obviates the need for the ListeningExecutorService. Change-Id: I7a3f369917431067ad1817a3ab53a358bc21f123 Signed-off-by: tpantelis --- .../datastore/DistributedDataStore.java | 56 +- .../datastore/ShardReadWriteTransaction.java | 2 +- .../datastore/ShardWriteTransaction.java | 2 +- .../ThreePhaseCommitCohortProxy.java | 190 +++--- .../datastore/TransactionChainProxy.java | 13 +- .../cluster/datastore/TransactionProxy.java | 249 ++++---- .../datastore/messages/DeleteData.java | 2 +- .../cluster/datastore/utils/ActorContext.java | 32 +- .../ThreePhaseCommitCohortProxyTest.java | 231 +++++-- .../datastore/TransactionProxyTest.java | 570 ++++++++++-------- .../datastore/utils/ActorContextTest.java | 97 ++- .../datastore/utils/MockActorContext.java | 28 +- 12 files changed, 928 insertions(+), 544 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 4fa26ffb20..404a4e0203 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -10,9 +10,8 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSystem; + import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; @@ -28,8 +27,6 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.util.PropertyUtils; -import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -42,37 +39,12 @@ import org.slf4j.LoggerFactory; */ public class DistributedDataStore implements DOMStore, SchemaContextListener, AutoCloseable { - private static final Logger - LOG = LoggerFactory.getLogger(DistributedDataStore.class); - - private static final String EXECUTOR_MAX_POOL_SIZE_PROP = - "mdsal.dist-datastore-executor-pool.size"; - private static final int DEFAULT_EXECUTOR_MAX_POOL_SIZE = 10; - - private static final String EXECUTOR_MAX_QUEUE_SIZE_PROP = - "mdsal.dist-datastore-executor-queue.size"; - private static final int DEFAULT_EXECUTOR_MAX_QUEUE_SIZE = 5000; + private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); private final ActorContext actorContext; private SchemaContext schemaContext; - /** - * Executor used to run FutureTask's - * - * This is typically used when we need to make a request to an actor and - * wait for it's response and the consumer needs to be provided a Future. - */ - private final ListeningExecutorService executor = - MoreExecutors.listeningDecorator( - SpecialExecutors.newBlockingBoundedFastThreadPool( - PropertyUtils.getIntSystemProperty( - EXECUTOR_MAX_POOL_SIZE_PROP, - DEFAULT_EXECUTOR_MAX_POOL_SIZE), - PropertyUtils.getIntSystemProperty( - EXECUTOR_MAX_QUEUE_SIZE_PROP, - DEFAULT_EXECUTOR_MAX_QUEUE_SIZE), "DistDataStore")); - public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, Configuration configuration, InMemoryDOMDataStoreConfigProperties dataStoreProperties) { Preconditions.checkNotNull(actorSystem, "actorSystem should not be null"); @@ -95,15 +67,16 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au } + @SuppressWarnings("unchecked") @Override - public >> ListenerRegistration registerChangeListener( + public >> + ListenerRegistration registerChangeListener( YangInstanceIdentifier path, L listener, AsyncDataBroker.DataChangeScope scope) { Preconditions.checkNotNull(path, "path should not be null"); Preconditions.checkNotNull(listener, "listener should not be null"); - LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope); ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf( @@ -112,10 +85,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au String shardName = ShardStrategyFactory.getStrategy(path).findShard(path); Object result = actorContext.executeLocalShardOperation(shardName, - new RegisterChangeListener(path, dataChangeListenerActor.path(), - scope), - ActorContext.ASK_DURATION - ); + new RegisterChangeListener(path, dataChangeListenerActor.path(), scope), + ActorContext.ASK_DURATION); if (result != null) { RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result; @@ -127,34 +98,31 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au LOG.debug( "No local shard for shardName {} was found so returning a noop registration", shardName); + return new NoOpDataChangeListenerRegistration(listener); } - - - - @Override public DOMStoreTransactionChain createTransactionChain() { - return new TransactionChainProxy(actorContext, executor, schemaContext); + return new TransactionChainProxy(actorContext, schemaContext); } @Override public DOMStoreReadTransaction newReadOnlyTransaction() { return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY, - executor, schemaContext); + schemaContext); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY, - executor, schemaContext); + schemaContext); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE, - executor, schemaContext); + schemaContext); } @Override public void onGlobalContextUpdated(SchemaContext schemaContext) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java index 97bb196f9f..49c7b7e78f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java @@ -53,7 +53,7 @@ public class ShardReadWriteTransaction extends ShardTransaction { } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) { mergeData(transaction, MergeData.fromSerializable(message, schemaContext)); } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) { - deleteData(transaction,DeleteData.fromSerizalizable(message)); + deleteData(transaction,DeleteData.fromSerializable(message)); } else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) { readyTransaction(transaction,new ReadyTransaction()); } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index 91e578b46d..b01fe7d4ac 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -50,7 +50,7 @@ public class ShardWriteTransaction extends ShardTransaction { } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) { mergeData(transaction, MergeData.fromSerializable(message, schemaContext)); } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) { - deleteData(transaction,DeleteData.fromSerizalizable(message)); + deleteData(transaction,DeleteData.fromSerializable(message)); } else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) { readyTransaction(transaction,new ReadyTransaction()); }else { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index 5b447943ea..fc455b193e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -10,11 +10,13 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorPath; import akka.actor.ActorSelection; +import akka.dispatch.Futures; +import akka.dispatch.OnComplete; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.SettableFuture; -import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; @@ -28,124 +30,156 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + import java.util.Collections; import java.util.List; -import java.util.concurrent.Callable; /** * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies */ -public class ThreePhaseCommitCohortProxy implements - DOMStoreThreePhaseCommitCohort{ +public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{ - private static final Logger - LOG = LoggerFactory.getLogger(DistributedDataStore.class); + private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); private final ActorContext actorContext; private final List cohortPaths; - private final ListeningExecutorService executor; private final String transactionId; - - public ThreePhaseCommitCohortProxy(ActorContext actorContext, - List cohortPaths, - String transactionId, - ListeningExecutorService executor) { - + public ThreePhaseCommitCohortProxy(ActorContext actorContext, List cohortPaths, + String transactionId) { this.actorContext = actorContext; this.cohortPaths = cohortPaths; this.transactionId = transactionId; - this.executor = executor; } - @Override public ListenableFuture canCommit() { + @Override + public ListenableFuture canCommit() { LOG.debug("txn {} canCommit", transactionId); - Callable call = new Callable() { + Future> combinedFuture = + invokeCohorts(new CanCommitTransaction().toSerializable()); + + final SettableFuture returnFuture = SettableFuture.create(); + + combinedFuture.onComplete(new OnComplete>() { @Override - public Boolean call() throws Exception { - for(ActorPath actorPath : cohortPaths){ - - Object message = new CanCommitTransaction().toSerializable(); - LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath); - - ActorSelection cohort = actorContext.actorSelection(actorPath); - - try { - Object response = - actorContext.executeRemoteOperation(cohort, - message, - ActorContext.ASK_DURATION); - - if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) { - CanCommitTransactionReply reply = - CanCommitTransactionReply.fromSerializable(response); - if (!reply.getCanCommit()) { - return false; - } + public void onComplete(Throwable failure, Iterable responses) throws Throwable { + if(failure != null) { + returnFuture.setException(failure); + return; + } + + boolean result = true; + for(Object response: responses) { + if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) { + CanCommitTransactionReply reply = + CanCommitTransactionReply.fromSerializable(response); + if (!reply.getCanCommit()) { + result = false; + break; } - } catch(RuntimeException e){ - // FIXME : Need to properly handle this - LOG.error("Unexpected Exception", e); - return false; + } else { + LOG.error("Unexpected response type {}", response.getClass()); + returnFuture.setException(new IllegalArgumentException( + String.format("Unexpected response type {}", response.getClass()))); + return; } } - return true; + returnFuture.set(Boolean.valueOf(result)); } - }; + }, actorContext.getActorSystem().dispatcher()); + + return returnFuture; + } + + private Future> invokeCohorts(Object message) { + List> futureList = Lists.newArrayListWithCapacity(cohortPaths.size()); + for(ActorPath actorPath : cohortPaths) { + + LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath); - return executor.submit(call); + ActorSelection cohort = actorContext.actorSelection(actorPath); + + futureList.add(actorContext.executeRemoteOperationAsync(cohort, message, + ActorContext.ASK_DURATION)); + } + + return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher()); } - @Override public ListenableFuture preCommit() { + @Override + public ListenableFuture preCommit() { LOG.debug("txn {} preCommit", transactionId); - return voidOperation(new PreCommitTransaction().toSerializable(), PreCommitTransactionReply.SERIALIZABLE_CLASS); + return voidOperation(new PreCommitTransaction().toSerializable(), + PreCommitTransactionReply.SERIALIZABLE_CLASS, true); } - @Override public ListenableFuture abort() { + @Override + public ListenableFuture abort() { LOG.debug("txn {} abort", transactionId); - return voidOperation(new AbortTransaction().toSerializable(), AbortTransactionReply.SERIALIZABLE_CLASS); + + // Note - we pass false for propagateException. In the front-end data broker, this method + // is called when one of the 3 phases fails with an exception. We'd rather have that + // original exception propagated to the client. If our abort fails and we propagate the + // exception then that exception will supersede and suppress the original exception. But + // it's the original exception that is the root cause and of more interest to the client. + + return voidOperation(new AbortTransaction().toSerializable(), + AbortTransactionReply.SERIALIZABLE_CLASS, false); } - @Override public ListenableFuture commit() { + @Override + public ListenableFuture commit() { LOG.debug("txn {} commit", transactionId); - return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS); + return voidOperation(new CommitTransaction().toSerializable(), + CommitTransactionReply.SERIALIZABLE_CLASS, true); } - private ListenableFuture voidOperation(final Object message, final Class expectedResponseClass){ - Callable call = new Callable() { - - @Override public Void call() throws Exception { - for(ActorPath actorPath : cohortPaths){ - ActorSelection cohort = actorContext.actorSelection(actorPath); - - LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath); - - try { - Object response = - actorContext.executeRemoteOperation(cohort, - message, - ActorContext.ASK_DURATION); - - if (response != null && !response.getClass() - .equals(expectedResponseClass)) { - throw new RuntimeException( - String.format( - "did not get the expected response \n\t\t expected : %s \n\t\t actual : %s", - expectedResponseClass.toString(), - response.getClass().toString()) - ); + private ListenableFuture voidOperation(final Object message, + final Class expectedResponseClass, final boolean propagateException) { + + Future> combinedFuture = invokeCohorts(message); + + final SettableFuture returnFuture = SettableFuture.create(); + + combinedFuture.onComplete(new OnComplete>() { + @Override + public void onComplete(Throwable failure, Iterable responses) throws Throwable { + + Throwable exceptionToPropagate = failure; + if(exceptionToPropagate == null) { + for(Object response: responses) { + if(!response.getClass().equals(expectedResponseClass)) { + exceptionToPropagate = new IllegalArgumentException( + String.format("Unexpected response type {}", + response.getClass())); + break; } - } catch(TimeoutException e){ - LOG.error(String.format("A timeout occurred when processing operation : %s", message)); } } - return null; + + if(exceptionToPropagate != null) { + if(propagateException) { + // We don't log the exception here to avoid redundant logging since we're + // propagating to the caller in MD-SAL core who will log it. + returnFuture.setException(exceptionToPropagate); + } else { + // Since the caller doesn't want us to propagate the exception we'll also + // not log it normally. But it's usually not good to totally silence + // exceptions so we'll log it to debug level. + LOG.debug(String.format("%s failed", message.getClass().getSimpleName()), + exceptionToPropagate); + returnFuture.set(null); + } + } else { + returnFuture.set(null); + } } - }; + }, actorContext.getActorSystem().dispatcher()); - return executor.submit(call); + return returnFuture; } public List getCohortPaths() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 5e9defa5b5..c4ec760b40 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -15,39 +15,34 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import com.google.common.util.concurrent.ListeningExecutorService; - /** * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard */ public class TransactionChainProxy implements DOMStoreTransactionChain{ private final ActorContext actorContext; - private final ListeningExecutorService transactionExecutor; private final SchemaContext schemaContext; - public TransactionChainProxy(ActorContext actorContext, ListeningExecutorService transactionExecutor, - SchemaContext schemaContext) { + public TransactionChainProxy(ActorContext actorContext, SchemaContext schemaContext) { this.actorContext = actorContext; - this.transactionExecutor = transactionExecutor; this.schemaContext = schemaContext; } @Override public DOMStoreReadTransaction newReadOnlyTransaction() { return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, schemaContext); + TransactionProxy.TransactionType.READ_ONLY, schemaContext); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.WRITE_ONLY, transactionExecutor, schemaContext); + TransactionProxy.TransactionType.WRITE_ONLY, schemaContext); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_WRITE, transactionExecutor, schemaContext); + TransactionProxy.TransactionType.READ_WRITE, schemaContext); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 95862ae9d9..5b5b1296af 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -12,13 +12,14 @@ import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Props; +import akka.dispatch.OnComplete; + import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListeningExecutorService; -import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; -import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; +import com.google.common.util.concurrent.SettableFuture; + import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; @@ -44,11 +45,12 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; /** @@ -80,25 +82,22 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final ActorContext actorContext; private final Map remoteTransactionPaths = new HashMap<>(); private final TransactionIdentifier identifier; - private final ListeningExecutorService executor; private final SchemaContext schemaContext; + private boolean inReadyState; - public TransactionProxy( - ActorContext actorContext, - TransactionType transactionType, - ListeningExecutorService executor, - SchemaContext schemaContext - ) { + public TransactionProxy(ActorContext actorContext, TransactionType transactionType, + SchemaContext schemaContext) { this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null"); - this.executor = Preconditions.checkNotNull(executor, "executor should not be null"); this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null"); String memberName = actorContext.getCurrentMemberName(); if(memberName == null){ memberName = "UNKNOWN-MEMBER"; } - this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(counter.getAndIncrement()).build(); + + this.identifier = TransactionIdentifier.builder().memberName(memberName).counter( + counter.getAndIncrement()).build(); LOG.debug("Created txn {}", identifier); @@ -108,6 +107,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public CheckedFuture>, ReadFailedException> read( final YangInstanceIdentifier path) { + Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, + "Read operation on write-only transaction is not allowed"); + LOG.debug("txn {} read {}", identifier, path); createTransactionIfMissing(actorContext, path); @@ -115,8 +117,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return transactionContext(path).readData(path); } - @Override public CheckedFuture exists( - YangInstanceIdentifier path) { + @Override + public CheckedFuture exists(YangInstanceIdentifier path) { + + Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, + "Exists operation on write-only transaction is not allowed"); + LOG.debug("txn {} exists {}", identifier, path); createTransactionIfMissing(actorContext, path); @@ -124,9 +130,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return transactionContext(path).dataExists(path); } + private void checkModificationState() { + Preconditions.checkState(transactionType != TransactionType.READ_ONLY, + "Modification operation on read-only transaction is not allowed"); + Preconditions.checkState(!inReadyState, + "Transaction is sealed - further modifications are allowed"); + } + @Override public void write(YangInstanceIdentifier path, NormalizedNode data) { + checkModificationState(); + LOG.debug("txn {} write {}", identifier, path); createTransactionIfMissing(actorContext, path); @@ -137,6 +152,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void merge(YangInstanceIdentifier path, NormalizedNode data) { + checkModificationState(); + LOG.debug("txn {} merge {}", identifier, path); createTransactionIfMissing(actorContext, path); @@ -147,6 +164,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void delete(YangInstanceIdentifier path) { + checkModificationState(); + LOG.debug("txn {} delete {}", identifier, path); createTransactionIfMissing(actorContext, path); @@ -156,25 +175,36 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public DOMStoreThreePhaseCommitCohort ready() { + + checkModificationState(); + + inReadyState = true; + List cohortPaths = new ArrayList<>(); - LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size()); + LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier, + remoteTransactionPaths.size()); for(TransactionContext transactionContext : remoteTransactionPaths.values()) { - LOG.debug("txn {} Readying transaction for shard {}", identifier, transactionContext.getShardName()); + LOG.debug("txn {} Readying transaction for shard {}", identifier, + transactionContext.getShardName()); Object result = transactionContext.readyTransaction(); if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){ - ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(actorContext.getActorSystem(),result); - String resolvedCohortPath = transactionContext - .getResolvedCohortPath(reply.getCohortPath().toString()); + ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable( + actorContext.getActorSystem(),result); + String resolvedCohortPath = transactionContext.getResolvedCohortPath( + reply.getCohortPath().toString()); cohortPaths.add(actorContext.actorFor(resolvedCohortPath)); + } else { + LOG.error("Was expecting {} but got {}", ReadyTransactionReply.SERIALIZABLE_CLASS, + result.getClass()); } } - return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString(), executor); + return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString()); } @Override @@ -213,8 +243,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { Object response = actorContext.executeShardOperation(shardName, new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(), ActorContext.ASK_DURATION); - if (response.getClass() - .equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { + if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { CreateTransactionReply reply = CreateTransactionReply.fromSerializable(response); @@ -229,11 +258,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { transactionActor); remoteTransactionPaths.put(shardName, transactionContext); + } else { + LOG.error("Was expecting {} but got {}", CreateTransactionReply.SERIALIZABLE_CLASS, + response.getClass()); } - } catch(TimeoutException | PrimaryNotFoundException e){ + } catch(Exception e){ LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage()); - remoteTransactionPaths.put(shardName, - new NoOpTransactionContext(shardName)); + remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e)); } } @@ -272,7 +303,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { this.actor = actor; } - @Override public String getShardName() { + @Override + public String getShardName() { return shardName; } @@ -280,96 +312,105 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return actor; } - @Override public String getResolvedCohortPath(String cohortPath) { + @Override + public String getResolvedCohortPath(String cohortPath) { return actorContext.resolvePath(actorPath, cohortPath); } - @Override public void closeTransaction() { - getActor().tell( - new CloseTransaction().toSerializable(), null); + @Override + public void closeTransaction() { + actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable()); } - @Override public Object readyTransaction() { + @Override + public Object readyTransaction() { return actorContext.executeRemoteOperation(getActor(), - new ReadyTransaction().toSerializable(), - ActorContext.ASK_DURATION - ); - + new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION); } - @Override public void deleteData(YangInstanceIdentifier path) { - getActor().tell(new DeleteData(path).toSerializable(), null); + @Override + public void deleteData(YangInstanceIdentifier path) { + actorContext.sendRemoteOperationAsync(getActor(), new DeleteData(path).toSerializable() ); } - @Override public void mergeData(YangInstanceIdentifier path, - NormalizedNode data) { - getActor() - .tell(new MergeData(path, data, schemaContext).toSerializable(), - null); + @Override + public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { + actorContext.sendRemoteOperationAsync(getActor(), + new MergeData(path, data, schemaContext).toSerializable()); } @Override public CheckedFuture>, ReadFailedException> readData( final YangInstanceIdentifier path) { - Callable>> call = - new Callable>>() { - - @Override public Optional> call() - throws Exception { - Object response = actorContext - .executeRemoteOperation(getActor(), - new ReadData(path).toSerializable(), - ActorContext.ASK_DURATION); - if (response.getClass() - .equals(ReadDataReply.SERIALIZABLE_CLASS)) { - ReadDataReply reply = ReadDataReply - .fromSerializable(schemaContext, path, - response); + final SettableFuture>> returnFuture = SettableFuture.create(); + + OnComplete onComplete = new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) throws Throwable { + if(failure != null) { + returnFuture.setException(new ReadFailedException( + "Error reading data for path " + path, failure)); + } else { + if (response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { + ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, + path, response); if (reply.getNormalizedNode() == null) { - return Optional.absent(); + returnFuture.set(Optional.>absent()); + } else { + returnFuture.set(Optional.>of( + reply.getNormalizedNode())); } - return Optional.>of( - reply.getNormalizedNode()); + } else { + returnFuture.setException(new ReadFailedException( + "Invalid response reading data for path " + path)); } - - throw new ReadFailedException("Read Failed " + path); } - }; + } + }; - return MappingCheckedFuture - .create(executor.submit(call), ReadFailedException.MAPPER); - } + Future future = actorContext.executeRemoteOperationAsync(getActor(), + new ReadData(path).toSerializable(), ActorContext.ASK_DURATION); + future.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); - @Override public void writeData(YangInstanceIdentifier path, - NormalizedNode data) { - getActor() - .tell(new WriteData(path, data, schemaContext).toSerializable(), - null); + return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER); } - @Override public CheckedFuture dataExists( - final YangInstanceIdentifier path) { - - Callable call = new Callable() { - - @Override public Boolean call() throws Exception { - Object o = actorContext.executeRemoteOperation(getActor(), - new DataExists(path).toSerializable(), - ActorContext.ASK_DURATION - ); - + @Override + public void writeData(YangInstanceIdentifier path, NormalizedNode data) { + actorContext.sendRemoteOperationAsync(getActor(), + new WriteData(path, data, schemaContext).toSerializable()); + } - if (DataExistsReply.SERIALIZABLE_CLASS - .equals(o.getClass())) { - return DataExistsReply.fromSerializable(o).exists(); + @Override + public CheckedFuture dataExists( + final YangInstanceIdentifier path) { + + final SettableFuture returnFuture = SettableFuture.create(); + + OnComplete onComplete = new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) throws Throwable { + if(failure != null) { + returnFuture.setException(new ReadFailedException( + "Error checking exists for path " + path, failure)); + } else { + if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) { + returnFuture.set(Boolean.valueOf(DataExistsReply. + fromSerializable(response).exists())); + } else { + returnFuture.setException(new ReadFailedException( + "Invalid response checking exists for path " + path)); + } } - - throw new ReadFailedException("Exists Failed " + path); } }; - return MappingCheckedFuture - .create(executor.submit(call), ReadFailedException.MAPPER); + + Future future = actorContext.executeRemoteOperationAsync(getActor(), + new DataExists(path).toSerializable(), ActorContext.ASK_DURATION); + future.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); + + return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER); } } @@ -379,22 +420,28 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG = LoggerFactory.getLogger(NoOpTransactionContext.class); private final String shardName; + private final Exception failure; private ActorRef cohort; - public NoOpTransactionContext(String shardName){ + public NoOpTransactionContext(String shardName, Exception failure){ this.shardName = shardName; + this.failure = failure; } - @Override public String getShardName() { + + @Override + public String getShardName() { return shardName; } - @Override public String getResolvedCohortPath(String cohortPath) { + @Override + public String getResolvedCohortPath(String cohortPath) { return cohort.path().toString(); } - @Override public void closeTransaction() { + @Override + public void closeTransaction() { LOG.warn("txn {} closeTransaction called", identifier); } @@ -404,11 +451,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return new ReadyTransactionReply(cohort.path()).toSerializable(); } - @Override public void deleteData(YangInstanceIdentifier path) { + @Override + public void deleteData(YangInstanceIdentifier path) { LOG.warn("txt {} deleteData called path = {}", identifier, path); } - @Override public void mergeData(YangInstanceIdentifier path, + @Override + public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.warn("txn {} mergeData called path = {}", identifier, path); } @@ -417,8 +466,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public CheckedFuture>, ReadFailedException> readData( YangInstanceIdentifier path) { LOG.warn("txn {} readData called path = {}", identifier, path); - return Futures.immediateCheckedFuture( - Optional.>absent()); + return Futures.immediateFailedCheckedFuture(new ReadFailedException( + "Error reading data for path " + path, failure)); } @Override public void writeData(YangInstanceIdentifier path, @@ -429,10 +478,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public CheckedFuture dataExists( YangInstanceIdentifier path) { LOG.warn("txn {} dataExists called path = {}", identifier, path); - - // Returning false instead of an exception to keep this aligned with - // read - return Futures.immediateCheckedFuture(false); + return Futures.immediateFailedCheckedFuture(new ReadFailedException( + "Error checking exists for path " + path, failure)); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteData.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteData.java index 17861a5a68..9ae851e76c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteData.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteData.java @@ -31,7 +31,7 @@ public class DeleteData implements SerializableMessage { .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(path)).build(); } - public static DeleteData fromSerizalizable(Object serializable){ + public static DeleteData fromSerializable(Object serializable){ ShardTransactionMessages.DeleteData o = (ShardTransactionMessages.DeleteData) serializable; return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments())); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 4706c66e25..e12a9663d1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -14,6 +14,7 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.util.Timeout; + import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; @@ -22,9 +23,9 @@ import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -54,8 +55,6 @@ public class ActorContext { private final ClusterWrapper clusterWrapper; private final Configuration configuration; - private SchemaContext schemaContext = null; - public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration) { @@ -174,6 +173,33 @@ public class ActorContext { } } + /** + * Execute an operation on a remote actor asynchronously. + * + * @param actor the ActorSelection + * @param message the message to send + * @param duration the maximum amount of time to send he message + * @return a Future containing the eventual result + */ + public Future executeRemoteOperationAsync(ActorSelection actor, Object message, + FiniteDuration duration) { + + LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString()); + + return ask(actor, message, new Timeout(duration)); + } + + /** + * Sends an operation to be executed by a remote actor asynchronously without waiting for a + * reply (essentially set and forget). + * + * @param actor the ActorSelection + * @param message the message to send + */ + public void sendRemoteOperationAsync(ActorSelection actor, Object message) { + actor.tell(message, ActorRef.noSender()); + } + /** * Execute an operation on the primary for a given shard *

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index 4eca5671f6..87231f0884 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -1,96 +1,245 @@ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorRef; +import akka.actor.ActorPath; +import akka.actor.ActorSelection; import akka.actor.Props; +import akka.dispatch.Futures; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - -import junit.framework.Assert; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; -import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.stubbing.Stubber; +import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; -import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; +import scala.concurrent.duration.FiniteDuration; -import java.util.Arrays; -import java.util.concurrent.Executors; - -import static org.junit.Assert.assertNotNull; +import java.util.List; +import java.util.concurrent.ExecutionException; public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { - private ThreePhaseCommitCohortProxy proxy; - private Props props; - private ActorRef actorRef; - private MockActorContext actorContext; - private final ListeningExecutorService executor = MoreExecutors.listeningDecorator( - Executors.newSingleThreadExecutor()); + @Mock + private ActorContext actorContext; @Before - public void setUp(){ - props = Props.create(MessageCollectorActor.class); - actorRef = getSystem().actorOf(props); - actorContext = new MockActorContext(this.getSystem()); + public void setUp() { + MockitoAnnotations.initMocks(this); - proxy = - new ThreePhaseCommitCohortProxy(actorContext, - Arrays.asList(actorRef.path()), "txn-1", executor); + doReturn(getSystem()).when(actorContext).getActorSystem(); + } + private ThreePhaseCommitCohortProxy setupProxy(int nCohorts) { + List cohorts = Lists.newArrayList(); + for(int i = 1; i <= nCohorts; i++) { + ActorPath path = getSystem().actorOf(Props.create(MessageCollectorActor.class)).path(); + cohorts.add(path); + doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path); + } + + return new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1"); } - @After - public void tearDown() { - executor.shutdownNow(); + private void setupMockActorContext(Class requestType, Object... responses) { + Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures + .failed((Throwable) responses[0]) : Futures + .successful(((SerializableMessage) responses[0]).toSerializable())); + + for(int i = 1; i < responses.length; i++) { + stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures + .failed((Throwable) responses[i]) : Futures + .successful(((SerializableMessage) responses[i]).toSerializable())); + } + + stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class), + isA(requestType), any(FiniteDuration.class)); + } + + private void verifyCohortInvocations(int nCohorts, Class requestType) { + verify(actorContext, times(nCohorts)).executeRemoteOperationAsync( + any(ActorSelection.class), isA(requestType), any(FiniteDuration.class)); + } + + @Test + public void testCanCommitWithOneCohort() throws Exception { + + ThreePhaseCommitCohortProxy proxy = setupProxy(1); + + setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, + new CanCommitTransactionReply(true)); + + ListenableFuture future = proxy.canCommit(); + + assertEquals("canCommit", true, future.get()); + + setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, + new CanCommitTransactionReply(false)); + + future = proxy.canCommit(); + + assertEquals("canCommit", false, future.get()); + + verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); } @Test - public void testCanCommit() throws Exception { - actorContext.setExecuteRemoteOperationResponse(new CanCommitTransactionReply(true).toSerializable()); + public void testCanCommitWithMultipleCohorts() throws Exception { + + ThreePhaseCommitCohortProxy proxy = setupProxy(2); + + setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, + new CanCommitTransactionReply(true), new CanCommitTransactionReply(true)); ListenableFuture future = proxy.canCommit(); - Assert.assertTrue(future.get().booleanValue()); + assertEquals("canCommit", true, future.get()); + verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); + } + + @Test + public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception { + + ThreePhaseCommitCohortProxy proxy = setupProxy(3); + + setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, + new CanCommitTransactionReply(true), new CanCommitTransactionReply(false), + new CanCommitTransactionReply(true)); + + ListenableFuture future = proxy.canCommit(); + + assertEquals("canCommit", false, future.get()); + + verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS); + } + + @Test(expected = ExecutionException.class) + public void testCanCommitWithExceptionFailure() throws Exception { + + ThreePhaseCommitCohortProxy proxy = setupProxy(1); + + setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock")); + + proxy.canCommit().get(); + } + + @Test(expected = ExecutionException.class) + public void testCanCommitWithInvalidResponseType() throws Exception { + + ThreePhaseCommitCohortProxy proxy = setupProxy(1); + + setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, + new PreCommitTransactionReply()); + + proxy.canCommit().get(); } @Test public void testPreCommit() throws Exception { - actorContext.setExecuteRemoteOperationResponse(new PreCommitTransactionReply().toSerializable()); + ThreePhaseCommitCohortProxy proxy = setupProxy(1); - ListenableFuture future = proxy.preCommit(); + setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS, + new PreCommitTransactionReply()); - future.get(); + proxy.preCommit().get(); + verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS); + } + + @Test(expected = ExecutionException.class) + public void testPreCommitWithFailure() throws Exception { + ThreePhaseCommitCohortProxy proxy = setupProxy(2); + + setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS, + new PreCommitTransactionReply(), new RuntimeException("mock")); + + proxy.preCommit().get(); } @Test public void testAbort() throws Exception { - actorContext.setExecuteRemoteOperationResponse(new AbortTransactionReply().toSerializable()); + ThreePhaseCommitCohortProxy proxy = setupProxy(1); - ListenableFuture future = proxy.abort(); + setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply()); - future.get(); + proxy.abort().get(); + verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS); + } + + @Test + public void testAbortWithFailure() throws Exception { + ThreePhaseCommitCohortProxy proxy = setupProxy(1); + + setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock")); + + // The exception should not get propagated. + proxy.abort().get(); + + verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS); } @Test public void testCommit() throws Exception { - actorContext.setExecuteRemoteOperationResponse(new CommitTransactionReply().toSerializable()); - ListenableFuture future = proxy.commit(); + ThreePhaseCommitCohortProxy proxy = setupProxy(2); + + setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(), + new CommitTransactionReply()); + + proxy.commit().get(); + + verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS); + } + + @Test(expected = ExecutionException.class) + public void testCommitWithFailure() throws Exception { + + ThreePhaseCommitCohortProxy proxy = setupProxy(2); - future.get(); + setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(), + new RuntimeException("mock")); + + proxy.commit().get(); + } + + @Test(expected = ExecutionException.class) + public void teseCommitWithInvalidResponseType() throws Exception { + + ThreePhaseCommitCohortProxy proxy = setupProxy(1); + + setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply()); + + proxy.commit().get(); } @Test - public void testGetCohortPaths() throws Exception { - assertNotNull(proxy.getCohortPaths()); + public void testGetCohortPaths() { + + ThreePhaseCommitCohortProxy proxy = setupProxy(2); + + List paths = proxy.getCohortPaths(); + assertNotNull("getCohortPaths returned null", paths); + assertEquals("getCohortPaths size", 2, paths.size()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 62052f38ab..14696f786e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -1,32 +1,44 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import akka.actor.ActorPath; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.Props; +import akka.dispatch.Futures; import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import junit.framework.Assert; -import org.junit.After; + import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; + +import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; +import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.MergeData; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; -import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; -import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; -import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; @@ -34,377 +46,433 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; -import java.util.List; -import java.util.concurrent.Executors; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; -import static junit.framework.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.isA; + +@SuppressWarnings("resource") public class TransactionProxyTest extends AbstractActorTest { + @SuppressWarnings("serial") + static class TestException extends RuntimeException { + } + + static interface Invoker { + void invoke(TransactionProxy proxy) throws Exception; + } + private final Configuration configuration = new MockConfiguration(); - private final ActorContext testContext = - new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration ); + @Mock + private ActorContext mockActorContext; - private final ListeningExecutorService transactionExecutor = - MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + private SchemaContext schemaContext; + + String memberName = "mock-member"; @Before public void setUp(){ - ShardStrategyFactory.setConfiguration(configuration); - } + MockitoAnnotations.initMocks(this); - @After - public void tearDown() { - transactionExecutor.shutdownNow(); - } + schemaContext = TestModel.createTestContext(); - @Test - public void testRead() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + doReturn(getSystem()).when(mockActorContext).getActorSystem(); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + ShardStrategyFactory.setConfiguration(configuration); + } + private CreateTransaction eqCreateTransaction(final String memberName, + final TransactionType type) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + CreateTransaction obj = CreateTransaction.fromSerializable(argument); + return obj.getTransactionId().startsWith(memberName) && + obj.getTransactionType() == type.ordinal(); + } + }; + + return argThat(matcher); + } - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + private DataExists eqDataExists() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + DataExists obj = DataExists.fromSerializable(argument); + return obj.getPath().equals(TestModel.TEST_PATH); + } + }; + return argThat(matcher); + } - actorContext.setExecuteRemoteOperationResponse( - new ReadDataReply(TestModel.createTestContext(), null) - .toSerializable()); + private ReadData eqReadData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + ReadData obj = ReadData.fromSerializable(argument); + return obj.getPath().equals(TestModel.TEST_PATH); + } + }; - ListenableFuture>> read = - transactionProxy.read(TestModel.TEST_PATH); + return argThat(matcher); + } - Optional> normalizedNodeOptional = read.get(); + private WriteData eqWriteData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + WriteData obj = WriteData.fromSerializable(argument, schemaContext); + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); + } + }; + + return argThat(matcher); + } - Assert.assertFalse(normalizedNodeOptional.isPresent()); + private MergeData eqMergeData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + MergeData obj = MergeData.fromSerializable(argument, schemaContext); + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); + } + }; + + return argThat(matcher); + } - actorContext.setExecuteRemoteOperationResponse(new ReadDataReply( - TestModel.createTestContext(),ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable()); + private DeleteData eqDeleteData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + DeleteData obj = DeleteData.fromSerializable(argument); + return obj.getPath().equals(TestModel.TEST_PATH); + } + }; - read = transactionProxy.read(TestModel.TEST_PATH); + return argThat(matcher); + } - normalizedNodeOptional = read.get(); + private Object readyTxReply(ActorPath path) { + return new ReadyTransactionReply(path).toSerializable(); + } - Assert.assertTrue(normalizedNodeOptional.isPresent()); + private Future readDataReply(NormalizedNode data) { + return Futures.successful(new ReadDataReply(schemaContext, data) + .toSerializable()); } - @Test - public void testExists() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + private Future dataExistsReply(boolean exists) { + return Futures.successful(new DataExistsReply(exists).toSerializable()); + } - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + private ActorSelection actorSelection(ActorRef actorRef) { + return getSystem().actorSelection(actorRef.path()); + } + private FiniteDuration anyDuration() { + return any(FiniteDuration.class); + } - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + private CreateTransactionReply createTransactionReply(ActorRef actorRef){ + return CreateTransactionReply.newBuilder() + .setTransactionActorPath(actorRef.path().toString()) + .setTransactionId("txn-1").build(); + } + private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) { + ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); + doReturn(getSystem().actorSelection(actorRef.path())). + when(mockActorContext).actorSelection(actorRef.path().toString()); + doReturn(memberName).when(mockActorContext).getCurrentMemberName(); + doReturn(createTransactionReply(actorRef)).when(mockActorContext). + executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD), + eqCreateTransaction(memberName, type), anyDuration()); + doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath( + anyString(), eq(actorRef.path().toString())); + doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString()); + + return actorRef; + } - actorContext.setExecuteRemoteOperationResponse(new DataExistsReply(false).toSerializable()); + @Test + public void testRead() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY); - CheckedFuture exists = - transactionProxy.exists(TestModel.TEST_PATH); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - Assert.assertFalse(exists.checkedGet()); + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); - actorContext.setExecuteRemoteOperationResponse(new DataExistsReply(true).toSerializable()); + Optional> readOptional = transactionProxy.read( + TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - exists = transactionProxy.exists(TestModel.TEST_PATH); + assertEquals("NormalizedNode isPresent", false, readOptional.isPresent()); - Assert.assertTrue(exists.checkedGet()); + NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - actorContext.setExecuteRemoteOperationResponse("bad message"); + doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); - exists = transactionProxy.exists(TestModel.TEST_PATH); + readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - try { - exists.checkedGet(); - fail(); - } catch(ReadFailedException e){ - } + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); + assertEquals("Response NormalizedNode", expectedNode, readOptional.get()); } @Test(expected = ReadFailedException.class) public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef actorRef = getSystem().actorOf(props); - - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); - - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + setupActorContextWithInitialCreateTransaction(READ_ONLY); + doReturn(Futures.successful(new Object())).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - CheckedFuture>, ReadFailedException> - read = transactionProxy.read(TestModel.TEST_PATH); - - read.checkedGet(); + transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); } - @Test - public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Exception { - final ActorContext actorContext = mock(ActorContext.class); - - when(actorContext.executeShardOperation(anyString(), any(), any( - FiniteDuration.class))).thenThrow(new PrimaryNotFoundException("test")); + @Test(expected = TestException.class) + public void testReadWithAsyncRemoteOperatonFailure() throws Throwable { + setupActorContextWithInitialCreateTransaction(READ_ONLY); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + doThrow(new TestException()).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - ListenableFuture>> read = - transactionProxy.read(TestModel.TEST_PATH); - - Assert.assertFalse(read.get().isPresent()); - + try { + transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + // Expected - throw cause - expects TestException. + throw e.getCause(); + } } + private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker) + throws Throwable { - @Test - public void testReadWhenATimeoutExceptionIsThrown() throws Exception { - final ActorContext actorContext = mock(ActorContext.class); + doThrow(exToThrow).when(mockActorContext).executeShardOperation( + anyString(), any(), anyDuration()); - when(actorContext.executeShardOperation(anyString(), any(), any( - FiniteDuration.class))).thenThrow(new TimeoutException("test", new Exception("reason"))); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + try { + invoker.invoke(transactionProxy); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + // Expected - throw cause - expects TestException. + throw e.getCause(); + } + } + private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable { + testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() { + @Override + public void invoke(TransactionProxy proxy) throws Exception { + proxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + } + }); + } - ListenableFuture>> read = - transactionProxy.read(TestModel.TEST_PATH); + @Test(expected = PrimaryNotFoundException.class) + public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable { + testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test")); + } - Assert.assertFalse(read.get().isPresent()); + @Test(expected = TimeoutException.class) + public void testReadWhenATimeoutExceptionIsThrown() throws Throwable { + testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test", + new Exception("reason"))); + } + @Test(expected = TestException.class) + public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable { + testReadWithExceptionOnInitialCreateTransaction(new TestException()); } @Test - public void testReadWhenAAnyOtherExceptionIsThrown() throws Exception { - final ActorContext actorContext = mock(ActorContext.class); - - when(actorContext.executeShardOperation(anyString(), any(), any( - FiniteDuration.class))).thenThrow(new NullPointerException()); - - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + public void testExists() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - try { - ListenableFuture>> read = - transactionProxy.read(TestModel.TEST_PATH); - fail("A null pointer exception was expected"); - } catch(NullPointerException e){ + doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); - } - } + Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); + assertEquals("Exists response", false, exists); + doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); - @Test - public void testWrite() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + assertEquals("Exists response", true, exists); + } - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + @Test(expected = PrimaryNotFoundException.class) + public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable { + testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() { + @Override + public void invoke(TransactionProxy proxy) throws Exception { + proxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + } + }); + } - transactionProxy.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.NAME_QNAME)); + @Test(expected = ReadFailedException.class) + public void testExistsWhenAnInvalidMessageIsSentInReply() throws Exception { + setupActorContextWithInitialCreateTransaction(READ_ONLY); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + doReturn(Futures.successful(new Object())).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); - Assert.assertNotNull(messages); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - Assert.assertTrue(messages instanceof List); + transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + } - List listMessages = (List) messages; + @Test(expected = TestException.class) + public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable { + setupActorContextWithInitialCreateTransaction(READ_ONLY); - Assert.assertEquals(1, listMessages.size()); + doThrow(new TestException()).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); - Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass()); - } + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - private Object createPrimaryFound(ActorRef actorRef) { - return new PrimaryFound(actorRef.path().toString()).toSerializable(); + try { + transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + // Expected - throw cause - expects TestException. + throw e.getCause(); + } } @Test - public void testMerge() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + public void testWrite() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY, schemaContext); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - transactionProxy.merge(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.NAME_QNAME)); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + verify(mockActorContext).sendRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + } - Assert.assertNotNull(messages); + @Test + public void testMerge() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); - Assert.assertTrue(messages instanceof List); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY, schemaContext); - List listMessages = (List) messages; + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - Assert.assertEquals(1, listMessages.size()); + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); - Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass()); + verify(mockActorContext).sendRemoteOperationAsync( + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); } @Test public void testDelete() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); - - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY, schemaContext); transactionProxy.delete(TestModel.TEST_PATH); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); - - Assert.assertNotNull(messages); - - Assert.assertTrue(messages instanceof List); - - List listMessages = (List) messages; - - Assert.assertEquals(1, listMessages.size()); - - Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass()); + verify(mockActorContext).sendRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDeleteData()); } + @SuppressWarnings("unchecked") @Test public void testReady() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef doNothingActorRef = getSystem().actorOf(props); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(doNothingActorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef)); - actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()).toSerializable()); + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperation( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE, schemaContext); transactionProxy.read(TestModel.TEST_PATH); DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0); - + assertEquals("getCohortPaths", Arrays.asList(actorRef.path()), proxy.getCohortPaths()); } @Test - public void testGetIdentifier(){ - final Props props = Props.create(DoNothingActor.class); - final ActorRef doNothingActorRef = getSystem().actorOf(props); - - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) ); - - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); - - Assert.assertNotNull(transactionProxy.getIdentifier()); + public void testGetIdentifier() { + setupActorContextWithInitialCreateTransaction(READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + TransactionProxy.TransactionType.READ_ONLY, schemaContext); + + Object id = transactionProxy.getIdentifier(); + assertNotNull("getIdentifier returned null", id); + assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName)); } + @SuppressWarnings("unchecked") @Test - public void testClose(){ - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + public void testClose() throws Exception{ + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE, schemaContext); transactionProxy.read(TestModel.TEST_PATH); transactionProxy.close(); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); - - Assert.assertNotNull(messages); - - Assert.assertTrue(messages instanceof List); - - List listMessages = (List) messages; - - Assert.assertEquals(1, listMessages.size()); - - Assert.assertTrue(listMessages.get(0).getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)); - } - - private CreateTransactionReply createTransactionReply(ActorRef actorRef){ - return CreateTransactionReply.newBuilder() - .setTransactionActorPath(actorRef.path().toString()) - .setTransactionId("txn-1") - .build(); + verify(mockActorContext).sendRemoteOperationAsync( + eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS)); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 5874eccda4..fda9ccdfdb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -1,11 +1,14 @@ package org.opendaylight.controller.cluster.datastore.utils; +import java.util.concurrent.TimeUnit; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.UntypedActor; import akka.japi.Creator; import akka.testkit.JavaTestKit; + import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; @@ -14,6 +17,9 @@ import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.mockito.Mockito.mock; @@ -74,14 +80,23 @@ public class ActorContextTest extends AbstractActorTest{ } private static Props props(final boolean found, final ActorRef actorRef){ - return Props.create(new Creator() { + return Props.create(new MockShardManagerCreator(found, actorRef) ); + } - @Override public MockShardManager create() - throws Exception { - return new MockShardManager(found, - actorRef); - } - }); + @SuppressWarnings("serial") + private static class MockShardManagerCreator implements Creator { + final boolean found; + final ActorRef actorRef; + + MockShardManagerCreator(boolean found, ActorRef actorRef) { + this.found = found; + this.actorRef = actorRef; + } + + @Override + public MockShardManager create() throws Exception { + return new MockShardManager(found, actorRef); + } } } @@ -90,6 +105,7 @@ public class ActorContextTest extends AbstractActorTest{ new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { + @Override protected void run() { ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); @@ -118,6 +134,7 @@ public class ActorContextTest extends AbstractActorTest{ new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { + @Override protected void run() { ActorRef shardManagerActorRef = getSystem() @@ -145,6 +162,7 @@ public class ActorContextTest extends AbstractActorTest{ new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { + @Override protected void run() { ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); @@ -173,6 +191,7 @@ public class ActorContextTest extends AbstractActorTest{ new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { + @Override protected void run() { ActorRef shardManagerActorRef = getSystem() @@ -193,4 +212,68 @@ public class ActorContextTest extends AbstractActorTest{ }}; } + + @Test + public void testExecuteRemoteOperation() { + new JavaTestKit(getSystem()) {{ + + new Within(duration("3 seconds")) { + @Override + protected void run() { + + ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); + + ActorRef shardManagerActorRef = getSystem() + .actorOf(MockShardManager.props(true, shardActorRef)); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), + mock(Configuration.class)); + + ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); + + Object out = actorContext.executeRemoteOperation(actor, "hello", duration("3 seconds")); + + assertEquals("hello", out); + + expectNoMsg(); + } + }; + }}; + } + + @Test + public void testExecuteRemoteOperationAsync() { + new JavaTestKit(getSystem()) {{ + + new Within(duration("3 seconds")) { + @Override + protected void run() { + + ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); + + ActorRef shardManagerActorRef = getSystem() + .actorOf(MockShardManager.props(true, shardActorRef)); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), + mock(Configuration.class)); + + ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); + + Future future = actorContext.executeRemoteOperationAsync(actor, "hello", + Duration.create(3, TimeUnit.SECONDS)); + + try { + Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS)); + assertEquals("Result", "hello", result); + } catch(Exception e) { + throw new AssertionError(e); + } + + expectNoMsg(); + } + }; + }}; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java index 5d3853f311..b19fd3a529 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java @@ -8,7 +8,7 @@ package org.opendaylight.controller.cluster.datastore.utils; - +import static org.junit.Assert.assertNotNull; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; @@ -16,10 +16,12 @@ import scala.concurrent.duration.FiniteDuration; public class MockActorContext extends ActorContext { - private Object executeShardOperationResponse; - private Object executeRemoteOperationResponse; - private Object executeLocalOperationResponse; - private Object executeLocalShardOperationResponse; + private volatile Object executeShardOperationResponse; + private volatile Object executeRemoteOperationResponse; + private volatile Object executeLocalOperationResponse; + private volatile Object executeLocalShardOperationResponse; + private volatile Exception executeRemoteOperationFailure; + private volatile Object inputMessage; public MockActorContext(ActorSystem actorSystem) { super(actorSystem, null, new MockClusterWrapper(), new MockConfiguration()); @@ -52,6 +54,10 @@ public class MockActorContext extends ActorContext { executeRemoteOperationResponse = response; } + public void setExecuteRemoteOperationFailure(Exception executeRemoteOperationFailure) { + this.executeRemoteOperationFailure = executeRemoteOperationFailure; + } + public void setExecuteLocalOperationResponse( Object executeLocalOperationResponse) { this.executeLocalOperationResponse = executeLocalOperationResponse; @@ -62,12 +68,20 @@ public class MockActorContext extends ActorContext { this.executeLocalShardOperationResponse = executeLocalShardOperationResponse; } - @Override public Object executeLocalOperation(ActorRef actor, + @SuppressWarnings("unchecked") + public T getInputMessage(Class expType) throws Exception { + assertNotNull("Input message was null", inputMessage); + return (T) expType.getMethod("fromSerializable", Object.class).invoke(null, inputMessage); + } + + @Override + public Object executeLocalOperation(ActorRef actor, Object message, FiniteDuration duration) { return this.executeLocalOperationResponse; } - @Override public Object executeLocalShardOperation(String shardName, + @Override + public Object executeLocalShardOperation(String shardName, Object message, FiniteDuration duration) { return this.executeLocalShardOperationResponse; } -- 2.36.6