import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.util.PropertyUtils;
-import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
*/
public class DistributedDataStore implements DOMStore, SchemaContextListener, AutoCloseable {
- private static final Logger
- LOG = LoggerFactory.getLogger(DistributedDataStore.class);
-
- private static final String EXECUTOR_MAX_POOL_SIZE_PROP =
- "mdsal.dist-datastore-executor-pool.size";
- private static final int DEFAULT_EXECUTOR_MAX_POOL_SIZE = 10;
-
- private static final String EXECUTOR_MAX_QUEUE_SIZE_PROP =
- "mdsal.dist-datastore-executor-queue.size";
- private static final int DEFAULT_EXECUTOR_MAX_QUEUE_SIZE = 5000;
+ private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
private final ActorContext actorContext;
private SchemaContext schemaContext;
- /**
- * Executor used to run FutureTask's
- *
- * This is typically used when we need to make a request to an actor and
- * wait for it's response and the consumer needs to be provided a Future.
- */
- private final ListeningExecutorService executor =
- MoreExecutors.listeningDecorator(
- SpecialExecutors.newBlockingBoundedFastThreadPool(
- PropertyUtils.getIntSystemProperty(
- EXECUTOR_MAX_POOL_SIZE_PROP,
- DEFAULT_EXECUTOR_MAX_POOL_SIZE),
- PropertyUtils.getIntSystemProperty(
- EXECUTOR_MAX_QUEUE_SIZE_PROP,
- DEFAULT_EXECUTOR_MAX_QUEUE_SIZE), "DistDataStore"));
-
public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
Configuration configuration, InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
}
+ @SuppressWarnings("unchecked")
@Override
- public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
+ public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ ListenerRegistration<L> registerChangeListener(
YangInstanceIdentifier path, L listener,
AsyncDataBroker.DataChangeScope scope) {
Preconditions.checkNotNull(path, "path should not be null");
Preconditions.checkNotNull(listener, "listener should not be null");
-
LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
Object result = actorContext.executeLocalShardOperation(shardName,
- new RegisterChangeListener(path, dataChangeListenerActor.path(),
- scope),
- ActorContext.ASK_DURATION
- );
+ new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+ ActorContext.ASK_DURATION);
if (result != null) {
RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
LOG.debug(
"No local shard for shardName {} was found so returning a noop registration",
shardName);
+
return new NoOpDataChangeListenerRegistration(listener);
}
-
-
-
-
@Override
public DOMStoreTransactionChain createTransactionChain() {
- return new TransactionChainProxy(actorContext, executor, schemaContext);
+ return new TransactionChainProxy(actorContext, schemaContext);
}
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
- executor, schemaContext);
+ schemaContext);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY,
- executor, schemaContext);
+ schemaContext);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE,
- executor, schemaContext);
+ schemaContext);
}
@Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
} else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
} else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- deleteData(transaction,DeleteData.fromSerizalizable(message));
+ deleteData(transaction,DeleteData.fromSerializable(message));
} else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
readyTransaction(transaction,new ReadyTransaction());
} else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
} else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
} else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- deleteData(transaction,DeleteData.fromSerizalizable(message));
+ deleteData(transaction,DeleteData.fromSerializable(message));
} else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
readyTransaction(transaction,new ReadyTransaction());
}else {
import akka.actor.ActorPath;
import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
+import akka.dispatch.OnComplete;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.Callable;
/**
* ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
*/
-public class ThreePhaseCommitCohortProxy implements
- DOMStoreThreePhaseCommitCohort{
+public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
- private static final Logger
- LOG = LoggerFactory.getLogger(DistributedDataStore.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
private final ActorContext actorContext;
private final List<ActorPath> cohortPaths;
- private final ListeningExecutorService executor;
private final String transactionId;
-
- public ThreePhaseCommitCohortProxy(ActorContext actorContext,
- List<ActorPath> cohortPaths,
- String transactionId,
- ListeningExecutorService executor) {
-
+ public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths,
+ String transactionId) {
this.actorContext = actorContext;
this.cohortPaths = cohortPaths;
this.transactionId = transactionId;
- this.executor = executor;
}
- @Override public ListenableFuture<Boolean> canCommit() {
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
LOG.debug("txn {} canCommit", transactionId);
- Callable<Boolean> call = new Callable<Boolean>() {
+ Future<Iterable<Object>> combinedFuture =
+ invokeCohorts(new CanCommitTransaction().toSerializable());
+
+ final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+
+ combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
@Override
- public Boolean call() throws Exception {
- for(ActorPath actorPath : cohortPaths){
-
- Object message = new CanCommitTransaction().toSerializable();
- LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
-
- ActorSelection cohort = actorContext.actorSelection(actorPath);
-
- try {
- Object response =
- actorContext.executeRemoteOperation(cohort,
- message,
- ActorContext.ASK_DURATION);
-
- if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
- CanCommitTransactionReply reply =
- CanCommitTransactionReply.fromSerializable(response);
- if (!reply.getCanCommit()) {
- return false;
- }
+ public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
+ if(failure != null) {
+ returnFuture.setException(failure);
+ return;
+ }
+
+ boolean result = true;
+ for(Object response: responses) {
+ if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
+ CanCommitTransactionReply reply =
+ CanCommitTransactionReply.fromSerializable(response);
+ if (!reply.getCanCommit()) {
+ result = false;
+ break;
}
- } catch(RuntimeException e){
- // FIXME : Need to properly handle this
- LOG.error("Unexpected Exception", e);
- return false;
+ } else {
+ LOG.error("Unexpected response type {}", response.getClass());
+ returnFuture.setException(new IllegalArgumentException(
+ String.format("Unexpected response type {}", response.getClass())));
+ return;
}
}
- return true;
+ returnFuture.set(Boolean.valueOf(result));
}
- };
+ }, actorContext.getActorSystem().dispatcher());
+
+ return returnFuture;
+ }
+
+ private Future<Iterable<Object>> invokeCohorts(Object message) {
+ List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
+ for(ActorPath actorPath : cohortPaths) {
+
+ LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
- return executor.submit(call);
+ ActorSelection cohort = actorContext.actorSelection(actorPath);
+
+ futureList.add(actorContext.executeRemoteOperationAsync(cohort, message,
+ ActorContext.ASK_DURATION));
+ }
+
+ return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
}
- @Override public ListenableFuture<Void> preCommit() {
+ @Override
+ public ListenableFuture<Void> preCommit() {
LOG.debug("txn {} preCommit", transactionId);
- return voidOperation(new PreCommitTransaction().toSerializable(), PreCommitTransactionReply.SERIALIZABLE_CLASS);
+ return voidOperation(new PreCommitTransaction().toSerializable(),
+ PreCommitTransactionReply.SERIALIZABLE_CLASS, true);
}
- @Override public ListenableFuture<Void> abort() {
+ @Override
+ public ListenableFuture<Void> abort() {
LOG.debug("txn {} abort", transactionId);
- return voidOperation(new AbortTransaction().toSerializable(), AbortTransactionReply.SERIALIZABLE_CLASS);
+
+ // Note - we pass false for propagateException. In the front-end data broker, this method
+ // is called when one of the 3 phases fails with an exception. We'd rather have that
+ // original exception propagated to the client. If our abort fails and we propagate the
+ // exception then that exception will supersede and suppress the original exception. But
+ // it's the original exception that is the root cause and of more interest to the client.
+
+ return voidOperation(new AbortTransaction().toSerializable(),
+ AbortTransactionReply.SERIALIZABLE_CLASS, false);
}
- @Override public ListenableFuture<Void> commit() {
+ @Override
+ public ListenableFuture<Void> commit() {
LOG.debug("txn {} commit", transactionId);
- return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS);
+ return voidOperation(new CommitTransaction().toSerializable(),
+ CommitTransactionReply.SERIALIZABLE_CLASS, true);
}
- private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
- Callable<Void> call = new Callable<Void>() {
-
- @Override public Void call() throws Exception {
- for(ActorPath actorPath : cohortPaths){
- ActorSelection cohort = actorContext.actorSelection(actorPath);
-
- LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
-
- try {
- Object response =
- actorContext.executeRemoteOperation(cohort,
- message,
- ActorContext.ASK_DURATION);
-
- if (response != null && !response.getClass()
- .equals(expectedResponseClass)) {
- throw new RuntimeException(
- String.format(
- "did not get the expected response \n\t\t expected : %s \n\t\t actual : %s",
- expectedResponseClass.toString(),
- response.getClass().toString())
- );
+ private ListenableFuture<Void> voidOperation(final Object message,
+ final Class<?> expectedResponseClass, final boolean propagateException) {
+
+ Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
+
+ final SettableFuture<Void> returnFuture = SettableFuture.create();
+
+ combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
+ @Override
+ public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
+
+ Throwable exceptionToPropagate = failure;
+ if(exceptionToPropagate == null) {
+ for(Object response: responses) {
+ if(!response.getClass().equals(expectedResponseClass)) {
+ exceptionToPropagate = new IllegalArgumentException(
+ String.format("Unexpected response type {}",
+ response.getClass()));
+ break;
}
- } catch(TimeoutException e){
- LOG.error(String.format("A timeout occurred when processing operation : %s", message));
}
}
- return null;
+
+ if(exceptionToPropagate != null) {
+ if(propagateException) {
+ // We don't log the exception here to avoid redundant logging since we're
+ // propagating to the caller in MD-SAL core who will log it.
+ returnFuture.setException(exceptionToPropagate);
+ } else {
+ // Since the caller doesn't want us to propagate the exception we'll also
+ // not log it normally. But it's usually not good to totally silence
+ // exceptions so we'll log it to debug level.
+ LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
+ exceptionToPropagate);
+ returnFuture.set(null);
+ }
+ } else {
+ returnFuture.set(null);
+ }
}
- };
+ }, actorContext.getActorSystem().dispatcher());
- return executor.submit(call);
+ return returnFuture;
}
public List<ActorPath> getCohortPaths() {
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import com.google.common.util.concurrent.ListeningExecutorService;
-
/**
* TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
*/
public class TransactionChainProxy implements DOMStoreTransactionChain{
private final ActorContext actorContext;
- private final ListeningExecutorService transactionExecutor;
private final SchemaContext schemaContext;
- public TransactionChainProxy(ActorContext actorContext, ListeningExecutorService transactionExecutor,
- SchemaContext schemaContext) {
+ public TransactionChainProxy(ActorContext actorContext, SchemaContext schemaContext) {
this.actorContext = actorContext;
- this.transactionExecutor = transactionExecutor;
this.schemaContext = schemaContext;
}
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, schemaContext);
+ TransactionProxy.TransactionType.READ_ONLY, schemaContext);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.WRITE_ONLY, transactionExecutor, schemaContext);
+ TransactionProxy.TransactionType.WRITE_ONLY, schemaContext);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_WRITE, transactionExecutor, schemaContext);
+ TransactionProxy.TransactionType.READ_WRITE, schemaContext);
}
@Override
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
+import akka.dispatch.OnComplete;
+
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
-import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import com.google.common.util.concurrent.SettableFuture;
+
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
/**
private final ActorContext actorContext;
private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
private final TransactionIdentifier identifier;
- private final ListeningExecutorService executor;
private final SchemaContext schemaContext;
+ private boolean inReadyState;
- public TransactionProxy(
- ActorContext actorContext,
- TransactionType transactionType,
- ListeningExecutorService executor,
- SchemaContext schemaContext
- ) {
+ public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
+ SchemaContext schemaContext) {
this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
- this.executor = Preconditions.checkNotNull(executor, "executor should not be null");
this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
String memberName = actorContext.getCurrentMemberName();
if(memberName == null){
memberName = "UNKNOWN-MEMBER";
}
- this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(counter.getAndIncrement()).build();
+
+ this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
+ counter.getAndIncrement()).build();
LOG.debug("Created txn {}", identifier);
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
final YangInstanceIdentifier path) {
+ Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
+ "Read operation on write-only transaction is not allowed");
+
LOG.debug("txn {} read {}", identifier, path);
createTransactionIfMissing(actorContext, path);
return transactionContext(path).readData(path);
}
- @Override public CheckedFuture<Boolean, ReadFailedException> exists(
- YangInstanceIdentifier path) {
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
+
+ Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
+ "Exists operation on write-only transaction is not allowed");
+
LOG.debug("txn {} exists {}", identifier, path);
createTransactionIfMissing(actorContext, path);
return transactionContext(path).dataExists(path);
}
+ private void checkModificationState() {
+ Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
+ "Modification operation on read-only transaction is not allowed");
+ Preconditions.checkState(!inReadyState,
+ "Transaction is sealed - further modifications are allowed");
+ }
+
@Override
public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ checkModificationState();
+
LOG.debug("txn {} write {}", identifier, path);
createTransactionIfMissing(actorContext, path);
@Override
public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ checkModificationState();
+
LOG.debug("txn {} merge {}", identifier, path);
createTransactionIfMissing(actorContext, path);
@Override
public void delete(YangInstanceIdentifier path) {
+ checkModificationState();
+
LOG.debug("txn {} delete {}", identifier, path);
createTransactionIfMissing(actorContext, path);
@Override
public DOMStoreThreePhaseCommitCohort ready() {
+
+ checkModificationState();
+
+ inReadyState = true;
+
List<ActorPath> cohortPaths = new ArrayList<>();
- LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size());
+ LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier,
+ remoteTransactionPaths.size());
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
- LOG.debug("txn {} Readying transaction for shard {}", identifier, transactionContext.getShardName());
+ LOG.debug("txn {} Readying transaction for shard {}", identifier,
+ transactionContext.getShardName());
Object result = transactionContext.readyTransaction();
if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
- ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(actorContext.getActorSystem(),result);
- String resolvedCohortPath = transactionContext
- .getResolvedCohortPath(reply.getCohortPath().toString());
+ ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
+ actorContext.getActorSystem(),result);
+ String resolvedCohortPath = transactionContext.getResolvedCohortPath(
+ reply.getCohortPath().toString());
cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
+ } else {
+ LOG.error("Was expecting {} but got {}", ReadyTransactionReply.SERIALIZABLE_CLASS,
+ result.getClass());
}
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString(), executor);
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString());
}
@Override
Object response = actorContext.executeShardOperation(shardName,
new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
ActorContext.ASK_DURATION);
- if (response.getClass()
- .equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
+ if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
CreateTransactionReply reply =
CreateTransactionReply.fromSerializable(response);
transactionActor);
remoteTransactionPaths.put(shardName, transactionContext);
+ } else {
+ LOG.error("Was expecting {} but got {}", CreateTransactionReply.SERIALIZABLE_CLASS,
+ response.getClass());
}
- } catch(TimeoutException | PrimaryNotFoundException e){
+ } catch(Exception e){
LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
- remoteTransactionPaths.put(shardName,
- new NoOpTransactionContext(shardName));
+ remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e));
}
}
this.actor = actor;
}
- @Override public String getShardName() {
+ @Override
+ public String getShardName() {
return shardName;
}
return actor;
}
- @Override public String getResolvedCohortPath(String cohortPath) {
+ @Override
+ public String getResolvedCohortPath(String cohortPath) {
return actorContext.resolvePath(actorPath, cohortPath);
}
- @Override public void closeTransaction() {
- getActor().tell(
- new CloseTransaction().toSerializable(), null);
+ @Override
+ public void closeTransaction() {
+ actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
}
- @Override public Object readyTransaction() {
+ @Override
+ public Object readyTransaction() {
return actorContext.executeRemoteOperation(getActor(),
- new ReadyTransaction().toSerializable(),
- ActorContext.ASK_DURATION
- );
-
+ new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
}
- @Override public void deleteData(YangInstanceIdentifier path) {
- getActor().tell(new DeleteData(path).toSerializable(), null);
+ @Override
+ public void deleteData(YangInstanceIdentifier path) {
+ actorContext.sendRemoteOperationAsync(getActor(), new DeleteData(path).toSerializable() );
}
- @Override public void mergeData(YangInstanceIdentifier path,
- NormalizedNode<?, ?> data) {
- getActor()
- .tell(new MergeData(path, data, schemaContext).toSerializable(),
- null);
+ @Override
+ public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ actorContext.sendRemoteOperationAsync(getActor(),
+ new MergeData(path, data, schemaContext).toSerializable());
}
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
final YangInstanceIdentifier path) {
- Callable<Optional<NormalizedNode<?, ?>>> call =
- new Callable<Optional<NormalizedNode<?, ?>>>() {
-
- @Override public Optional<NormalizedNode<?, ?>> call()
- throws Exception {
- Object response = actorContext
- .executeRemoteOperation(getActor(),
- new ReadData(path).toSerializable(),
- ActorContext.ASK_DURATION);
- if (response.getClass()
- .equals(ReadDataReply.SERIALIZABLE_CLASS)) {
- ReadDataReply reply = ReadDataReply
- .fromSerializable(schemaContext, path,
- response);
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
+
+ OnComplete<Object> onComplete = new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) throws Throwable {
+ if(failure != null) {
+ returnFuture.setException(new ReadFailedException(
+ "Error reading data for path " + path, failure));
+ } else {
+ if (response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+ ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
+ path, response);
if (reply.getNormalizedNode() == null) {
- return Optional.absent();
+ returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
+ } else {
+ returnFuture.set(Optional.<NormalizedNode<?, ?>>of(
+ reply.getNormalizedNode()));
}
- return Optional.<NormalizedNode<?, ?>>of(
- reply.getNormalizedNode());
+ } else {
+ returnFuture.setException(new ReadFailedException(
+ "Invalid response reading data for path " + path));
}
-
- throw new ReadFailedException("Read Failed " + path);
}
- };
+ }
+ };
- return MappingCheckedFuture
- .create(executor.submit(call), ReadFailedException.MAPPER);
- }
+ Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
+ new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
+ future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
- @Override public void writeData(YangInstanceIdentifier path,
- NormalizedNode<?, ?> data) {
- getActor()
- .tell(new WriteData(path, data, schemaContext).toSerializable(),
- null);
+ return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
}
- @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
- final YangInstanceIdentifier path) {
-
- Callable<Boolean> call = new Callable<Boolean>() {
-
- @Override public Boolean call() throws Exception {
- Object o = actorContext.executeRemoteOperation(getActor(),
- new DataExists(path).toSerializable(),
- ActorContext.ASK_DURATION
- );
-
+ @Override
+ public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ actorContext.sendRemoteOperationAsync(getActor(),
+ new WriteData(path, data, schemaContext).toSerializable());
+ }
- if (DataExistsReply.SERIALIZABLE_CLASS
- .equals(o.getClass())) {
- return DataExistsReply.fromSerializable(o).exists();
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> dataExists(
+ final YangInstanceIdentifier path) {
+
+ final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+
+ OnComplete<Object> onComplete = new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) throws Throwable {
+ if(failure != null) {
+ returnFuture.setException(new ReadFailedException(
+ "Error checking exists for path " + path, failure));
+ } else {
+ if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
+ returnFuture.set(Boolean.valueOf(DataExistsReply.
+ fromSerializable(response).exists()));
+ } else {
+ returnFuture.setException(new ReadFailedException(
+ "Invalid response checking exists for path " + path));
+ }
}
-
- throw new ReadFailedException("Exists Failed " + path);
}
};
- return MappingCheckedFuture
- .create(executor.submit(call), ReadFailedException.MAPPER);
+
+ Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
+ new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
+ future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+
+ return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
}
}
LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
private final String shardName;
+ private final Exception failure;
private ActorRef cohort;
- public NoOpTransactionContext(String shardName){
+ public NoOpTransactionContext(String shardName, Exception failure){
this.shardName = shardName;
+ this.failure = failure;
}
- @Override public String getShardName() {
+
+ @Override
+ public String getShardName() {
return shardName;
}
- @Override public String getResolvedCohortPath(String cohortPath) {
+ @Override
+ public String getResolvedCohortPath(String cohortPath) {
return cohort.path().toString();
}
- @Override public void closeTransaction() {
+ @Override
+ public void closeTransaction() {
LOG.warn("txn {} closeTransaction called", identifier);
}
return new ReadyTransactionReply(cohort.path()).toSerializable();
}
- @Override public void deleteData(YangInstanceIdentifier path) {
+ @Override
+ public void deleteData(YangInstanceIdentifier path) {
LOG.warn("txt {} deleteData called path = {}", identifier, path);
}
- @Override public void mergeData(YangInstanceIdentifier path,
+ @Override
+ public void mergeData(YangInstanceIdentifier path,
NormalizedNode<?, ?> data) {
LOG.warn("txn {} mergeData called path = {}", identifier, path);
}
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
YangInstanceIdentifier path) {
LOG.warn("txn {} readData called path = {}", identifier, path);
- return Futures.immediateCheckedFuture(
- Optional.<NormalizedNode<?, ?>>absent());
+ return Futures.immediateFailedCheckedFuture(new ReadFailedException(
+ "Error reading data for path " + path, failure));
}
@Override public void writeData(YangInstanceIdentifier path,
@Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
YangInstanceIdentifier path) {
LOG.warn("txn {} dataExists called path = {}", identifier, path);
-
- // Returning false instead of an exception to keep this aligned with
- // read
- return Futures.immediateCheckedFuture(false);
+ return Futures.immediateFailedCheckedFuture(new ReadFailedException(
+ "Error checking exists for path " + path, failure));
}
}
.setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(path)).build();
}
- public static DeleteData fromSerizalizable(Object serializable){
+ public static DeleteData fromSerializable(Object serializable){
ShardTransactionMessages.DeleteData o = (ShardTransactionMessages.DeleteData) serializable;
return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
}
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.util.Timeout;
+
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
- private SchemaContext schemaContext = null;
-
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper,
Configuration configuration) {
}
}
+ /**
+ * Execute an operation on a remote actor asynchronously.
+ *
+ * @param actor the ActorSelection
+ * @param message the message to send
+ * @param duration the maximum amount of time to send he message
+ * @return a Future containing the eventual result
+ */
+ public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message,
+ FiniteDuration duration) {
+
+ LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
+
+ return ask(actor, message, new Timeout(duration));
+ }
+
+ /**
+ * Sends an operation to be executed by a remote actor asynchronously without waiting for a
+ * reply (essentially set and forget).
+ *
+ * @param actor the ActorSelection
+ * @param message the message to send
+ */
+ public void sendRemoteOperationAsync(ActorSelection actor, Object message) {
+ actor.tell(message, ActorRef.noSender());
+ }
+
/**
* Execute an operation on the primary for a given shard
* <p>
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorRef;
+import akka.actor.ActorPath;
+import akka.actor.ActorSelection;
import akka.actor.Props;
+import akka.dispatch.Futures;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import junit.framework.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.stubbing.Stubber;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
-import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
+import scala.concurrent.duration.FiniteDuration;
-import java.util.Arrays;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertNotNull;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
- private ThreePhaseCommitCohortProxy proxy;
- private Props props;
- private ActorRef actorRef;
- private MockActorContext actorContext;
- private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
- Executors.newSingleThreadExecutor());
+ @Mock
+ private ActorContext actorContext;
@Before
- public void setUp(){
- props = Props.create(MessageCollectorActor.class);
- actorRef = getSystem().actorOf(props);
- actorContext = new MockActorContext(this.getSystem());
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
- proxy =
- new ThreePhaseCommitCohortProxy(actorContext,
- Arrays.asList(actorRef.path()), "txn-1", executor);
+ doReturn(getSystem()).when(actorContext).getActorSystem();
+ }
+ private ThreePhaseCommitCohortProxy setupProxy(int nCohorts) {
+ List<ActorPath> cohorts = Lists.newArrayList();
+ for(int i = 1; i <= nCohorts; i++) {
+ ActorPath path = getSystem().actorOf(Props.create(MessageCollectorActor.class)).path();
+ cohorts.add(path);
+ doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
+ }
+
+ return new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
}
- @After
- public void tearDown() {
- executor.shutdownNow();
+ private void setupMockActorContext(Class<?> requestType, Object... responses) {
+ Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
+ .failed((Throwable) responses[0]) : Futures
+ .successful(((SerializableMessage) responses[0]).toSerializable()));
+
+ for(int i = 1; i < responses.length; i++) {
+ stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
+ .failed((Throwable) responses[i]) : Futures
+ .successful(((SerializableMessage) responses[i]).toSerializable()));
+ }
+
+ stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class),
+ isA(requestType), any(FiniteDuration.class));
+ }
+
+ private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
+ verify(actorContext, times(nCohorts)).executeRemoteOperationAsync(
+ any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
+ }
+
+ @Test
+ public void testCanCommitWithOneCohort() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+ new CanCommitTransactionReply(true));
+
+ ListenableFuture<Boolean> future = proxy.canCommit();
+
+ assertEquals("canCommit", true, future.get());
+
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+ new CanCommitTransactionReply(false));
+
+ future = proxy.canCommit();
+
+ assertEquals("canCommit", false, future.get());
+
+ verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
}
@Test
- public void testCanCommit() throws Exception {
- actorContext.setExecuteRemoteOperationResponse(new CanCommitTransactionReply(true).toSerializable());
+ public void testCanCommitWithMultipleCohorts() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(2);
+
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+ new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
ListenableFuture<Boolean> future = proxy.canCommit();
- Assert.assertTrue(future.get().booleanValue());
+ assertEquals("canCommit", true, future.get());
+ verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
+ }
+
+ @Test
+ public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(3);
+
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+ new CanCommitTransactionReply(true), new CanCommitTransactionReply(false),
+ new CanCommitTransactionReply(true));
+
+ ListenableFuture<Boolean> future = proxy.canCommit();
+
+ assertEquals("canCommit", false, future.get());
+
+ verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testCanCommitWithExceptionFailure() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
+
+ proxy.canCommit().get();
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testCanCommitWithInvalidResponseType() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+ new PreCommitTransactionReply());
+
+ proxy.canCommit().get();
}
@Test
public void testPreCommit() throws Exception {
- actorContext.setExecuteRemoteOperationResponse(new PreCommitTransactionReply().toSerializable());
+ ThreePhaseCommitCohortProxy proxy = setupProxy(1);
- ListenableFuture<Void> future = proxy.preCommit();
+ setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
+ new PreCommitTransactionReply());
- future.get();
+ proxy.preCommit().get();
+ verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testPreCommitWithFailure() throws Exception {
+ ThreePhaseCommitCohortProxy proxy = setupProxy(2);
+
+ setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
+ new PreCommitTransactionReply(), new RuntimeException("mock"));
+
+ proxy.preCommit().get();
}
@Test
public void testAbort() throws Exception {
- actorContext.setExecuteRemoteOperationResponse(new AbortTransactionReply().toSerializable());
+ ThreePhaseCommitCohortProxy proxy = setupProxy(1);
- ListenableFuture<Void> future = proxy.abort();
+ setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
- future.get();
+ proxy.abort().get();
+ verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
+ }
+
+ @Test
+ public void testAbortWithFailure() throws Exception {
+ ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+ setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
+
+ // The exception should not get propagated.
+ proxy.abort().get();
+
+ verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
}
@Test
public void testCommit() throws Exception {
- actorContext.setExecuteRemoteOperationResponse(new CommitTransactionReply().toSerializable());
- ListenableFuture<Void> future = proxy.commit();
+ ThreePhaseCommitCohortProxy proxy = setupProxy(2);
+
+ setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
+ new CommitTransactionReply());
+
+ proxy.commit().get();
+
+ verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testCommitWithFailure() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(2);
- future.get();
+ setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
+ new RuntimeException("mock"));
+
+ proxy.commit().get();
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void teseCommitWithInvalidResponseType() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+ setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
+
+ proxy.commit().get();
}
@Test
- public void testGetCohortPaths() throws Exception {
- assertNotNull(proxy.getCohortPaths());
+ public void testGetCohortPaths() {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(2);
+
+ List<ActorPath> paths = proxy.getCohortPaths();
+ assertNotNull("getCohortPaths returned null", paths);
+ assertEquals("getCohortPaths size", 2, paths.size());
}
}
package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import akka.actor.ActorPath;
import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
import akka.actor.Props;
+import akka.dispatch.Futures;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import junit.framework.Assert;
-import org.junit.After;
+
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
-import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-import java.util.List;
-import java.util.concurrent.Executors;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
-import static junit.framework.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.isA;
+
+@SuppressWarnings("resource")
public class TransactionProxyTest extends AbstractActorTest {
+ @SuppressWarnings("serial")
+ static class TestException extends RuntimeException {
+ }
+
+ static interface Invoker {
+ void invoke(TransactionProxy proxy) throws Exception;
+ }
+
private final Configuration configuration = new MockConfiguration();
- private final ActorContext testContext =
- new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration );
+ @Mock
+ private ActorContext mockActorContext;
- private final ListeningExecutorService transactionExecutor =
- MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+ private SchemaContext schemaContext;
+
+ String memberName = "mock-member";
@Before
public void setUp(){
- ShardStrategyFactory.setConfiguration(configuration);
- }
+ MockitoAnnotations.initMocks(this);
- @After
- public void tearDown() {
- transactionExecutor.shutdownNow();
- }
+ schemaContext = TestModel.createTestContext();
- @Test
- public void testRead() throws Exception {
- final Props props = Props.create(DoNothingActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
+ doReturn(getSystem()).when(mockActorContext).getActorSystem();
- final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
- actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
- actorContext.setExecuteRemoteOperationResponse("message");
+ ShardStrategyFactory.setConfiguration(configuration);
+ }
+ private CreateTransaction eqCreateTransaction(final String memberName,
+ final TransactionType type) {
+ ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
+ @Override
+ public boolean matches(Object argument) {
+ CreateTransaction obj = CreateTransaction.fromSerializable(argument);
+ return obj.getTransactionId().startsWith(memberName) &&
+ obj.getTransactionType() == type.ordinal();
+ }
+ };
+
+ return argThat(matcher);
+ }
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ private DataExists eqDataExists() {
+ ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
+ @Override
+ public boolean matches(Object argument) {
+ DataExists obj = DataExists.fromSerializable(argument);
+ return obj.getPath().equals(TestModel.TEST_PATH);
+ }
+ };
+ return argThat(matcher);
+ }
- actorContext.setExecuteRemoteOperationResponse(
- new ReadDataReply(TestModel.createTestContext(), null)
- .toSerializable());
+ private ReadData eqReadData() {
+ ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
+ @Override
+ public boolean matches(Object argument) {
+ ReadData obj = ReadData.fromSerializable(argument);
+ return obj.getPath().equals(TestModel.TEST_PATH);
+ }
+ };
- ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
- transactionProxy.read(TestModel.TEST_PATH);
+ return argThat(matcher);
+ }
- Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
+ private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
+ ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
+ @Override
+ public boolean matches(Object argument) {
+ WriteData obj = WriteData.fromSerializable(argument, schemaContext);
+ return obj.getPath().equals(TestModel.TEST_PATH) &&
+ obj.getData().equals(nodeToWrite);
+ }
+ };
+
+ return argThat(matcher);
+ }
- Assert.assertFalse(normalizedNodeOptional.isPresent());
+ private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
+ ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
+ @Override
+ public boolean matches(Object argument) {
+ MergeData obj = MergeData.fromSerializable(argument, schemaContext);
+ return obj.getPath().equals(TestModel.TEST_PATH) &&
+ obj.getData().equals(nodeToWrite);
+ }
+ };
+
+ return argThat(matcher);
+ }
- actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
- TestModel.createTestContext(),ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable());
+ private DeleteData eqDeleteData() {
+ ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
+ @Override
+ public boolean matches(Object argument) {
+ DeleteData obj = DeleteData.fromSerializable(argument);
+ return obj.getPath().equals(TestModel.TEST_PATH);
+ }
+ };
- read = transactionProxy.read(TestModel.TEST_PATH);
+ return argThat(matcher);
+ }
- normalizedNodeOptional = read.get();
+ private Object readyTxReply(ActorPath path) {
+ return new ReadyTransactionReply(path).toSerializable();
+ }
- Assert.assertTrue(normalizedNodeOptional.isPresent());
+ private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
+ return Futures.successful(new ReadDataReply(schemaContext, data)
+ .toSerializable());
}
- @Test
- public void testExists() throws Exception {
- final Props props = Props.create(DoNothingActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
+ private Future<Object> dataExistsReply(boolean exists) {
+ return Futures.successful(new DataExistsReply(exists).toSerializable());
+ }
- final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
- actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
- actorContext.setExecuteRemoteOperationResponse("message");
+ private ActorSelection actorSelection(ActorRef actorRef) {
+ return getSystem().actorSelection(actorRef.path());
+ }
+ private FiniteDuration anyDuration() {
+ return any(FiniteDuration.class);
+ }
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ private CreateTransactionReply createTransactionReply(ActorRef actorRef){
+ return CreateTransactionReply.newBuilder()
+ .setTransactionActorPath(actorRef.path().toString())
+ .setTransactionId("txn-1").build();
+ }
+ private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) {
+ ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
+ doReturn(getSystem().actorSelection(actorRef.path())).
+ when(mockActorContext).actorSelection(actorRef.path().toString());
+ doReturn(memberName).when(mockActorContext).getCurrentMemberName();
+ doReturn(createTransactionReply(actorRef)).when(mockActorContext).
+ executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
+ eqCreateTransaction(memberName, type), anyDuration());
+ doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
+ anyString(), eq(actorRef.path().toString()));
+ doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
+
+ return actorRef;
+ }
- actorContext.setExecuteRemoteOperationResponse(new DataExistsReply(false).toSerializable());
+ @Test
+ public void testRead() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
- CheckedFuture<Boolean, ReadFailedException> exists =
- transactionProxy.exists(TestModel.TEST_PATH);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_ONLY, schemaContext);
- Assert.assertFalse(exists.checkedGet());
+ doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqReadData(), anyDuration());
- actorContext.setExecuteRemoteOperationResponse(new DataExistsReply(true).toSerializable());
+ Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
+ TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
- exists = transactionProxy.exists(TestModel.TEST_PATH);
+ assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
- Assert.assertTrue(exists.checkedGet());
+ NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- actorContext.setExecuteRemoteOperationResponse("bad message");
+ doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqReadData(), anyDuration());
- exists = transactionProxy.exists(TestModel.TEST_PATH);
+ readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
- try {
- exists.checkedGet();
- fail();
- } catch(ReadFailedException e){
- }
+ assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+ assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
}
@Test(expected = ReadFailedException.class)
public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception {
- final Props props = Props.create(DoNothingActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
-
- final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
- actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
- actorContext.setExecuteRemoteOperationResponse("message");
-
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ setupActorContextWithInitialCreateTransaction(READ_ONLY);
+ doReturn(Futures.successful(new Object())).when(mockActorContext).
+ executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_ONLY, schemaContext);
- CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>
- read = transactionProxy.read(TestModel.TEST_PATH);
-
- read.checkedGet();
+ transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
}
- @Test
- public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
- final ActorContext actorContext = mock(ActorContext.class);
-
- when(actorContext.executeShardOperation(anyString(), any(), any(
- FiniteDuration.class))).thenThrow(new PrimaryNotFoundException("test"));
+ @Test(expected = TestException.class)
+ public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
+ setupActorContextWithInitialCreateTransaction(READ_ONLY);
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ doThrow(new TestException()).when(mockActorContext).
+ executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_ONLY, schemaContext);
- ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
- transactionProxy.read(TestModel.TEST_PATH);
-
- Assert.assertFalse(read.get().isPresent());
-
+ try {
+ transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+ fail("Expected ReadFailedException");
+ } catch(ReadFailedException e) {
+ // Expected - throw cause - expects TestException.
+ throw e.getCause();
+ }
}
+ private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
+ throws Throwable {
- @Test
- public void testReadWhenATimeoutExceptionIsThrown() throws Exception {
- final ActorContext actorContext = mock(ActorContext.class);
+ doThrow(exToThrow).when(mockActorContext).executeShardOperation(
+ anyString(), any(), anyDuration());
- when(actorContext.executeShardOperation(anyString(), any(), any(
- FiniteDuration.class))).thenThrow(new TimeoutException("test", new Exception("reason")));
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_ONLY, schemaContext);
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ try {
+ invoker.invoke(transactionProxy);
+ fail("Expected ReadFailedException");
+ } catch(ReadFailedException e) {
+ // Expected - throw cause - expects TestException.
+ throw e.getCause();
+ }
+ }
+ private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
+ testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
+ @Override
+ public void invoke(TransactionProxy proxy) throws Exception {
+ proxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+ }
+ });
+ }
- ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
- transactionProxy.read(TestModel.TEST_PATH);
+ @Test(expected = PrimaryNotFoundException.class)
+ public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
+ testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
+ }
- Assert.assertFalse(read.get().isPresent());
+ @Test(expected = TimeoutException.class)
+ public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
+ testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
+ new Exception("reason")));
+ }
+ @Test(expected = TestException.class)
+ public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
+ testReadWithExceptionOnInitialCreateTransaction(new TestException());
}
@Test
- public void testReadWhenAAnyOtherExceptionIsThrown() throws Exception {
- final ActorContext actorContext = mock(ActorContext.class);
-
- when(actorContext.executeShardOperation(anyString(), any(), any(
- FiniteDuration.class))).thenThrow(new NullPointerException());
-
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ public void testExists() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_ONLY, schemaContext);
- try {
- ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
- transactionProxy.read(TestModel.TEST_PATH);
- fail("A null pointer exception was expected");
- } catch(NullPointerException e){
+ doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
- }
- }
+ Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
+ assertEquals("Exists response", false, exists);
+ doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
- @Test
- public void testWrite() throws Exception {
- final Props props = Props.create(MessageCollectorActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
+ exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
- final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
- actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
- actorContext.setExecuteRemoteOperationResponse("message");
+ assertEquals("Exists response", true, exists);
+ }
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ @Test(expected = PrimaryNotFoundException.class)
+ public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
+ testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
+ @Override
+ public void invoke(TransactionProxy proxy) throws Exception {
+ proxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+ }
+ });
+ }
- transactionProxy.write(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.NAME_QNAME));
+ @Test(expected = ReadFailedException.class)
+ public void testExistsWhenAnInvalidMessageIsSentInReply() throws Exception {
+ setupActorContextWithInitialCreateTransaction(READ_ONLY);
- Object messages = testContext
- .executeLocalOperation(actorRef, "messages",
- ActorContext.ASK_DURATION);
+ doReturn(Futures.successful(new Object())).when(mockActorContext).
+ executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
- Assert.assertNotNull(messages);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_ONLY, schemaContext);
- Assert.assertTrue(messages instanceof List);
+ transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+ }
- List<Object> listMessages = (List<Object>) messages;
+ @Test(expected = TestException.class)
+ public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
+ setupActorContextWithInitialCreateTransaction(READ_ONLY);
- Assert.assertEquals(1, listMessages.size());
+ doThrow(new TestException()).when(mockActorContext).
+ executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
- Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
- }
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_ONLY, schemaContext);
- private Object createPrimaryFound(ActorRef actorRef) {
- return new PrimaryFound(actorRef.path().toString()).toSerializable();
+ try {
+ transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+ fail("Expected ReadFailedException");
+ } catch(ReadFailedException e) {
+ // Expected - throw cause - expects TestException.
+ throw e.getCause();
+ }
}
@Test
- public void testMerge() throws Exception {
- final Props props = Props.create(MessageCollectorActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
+ public void testWrite() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
- final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
- actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
- actorContext.setExecuteRemoteOperationResponse("message");
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- transactionProxy.merge(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.NAME_QNAME));
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- Object messages = testContext
- .executeLocalOperation(actorRef, "messages",
- ActorContext.ASK_DURATION);
+ verify(mockActorContext).sendRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+ }
- Assert.assertNotNull(messages);
+ @Test
+ public void testMerge() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
- Assert.assertTrue(messages instanceof List);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
- List<Object> listMessages = (List<Object>) messages;
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- Assert.assertEquals(1, listMessages.size());
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
- Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
+ verify(mockActorContext).sendRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
}
@Test
public void testDelete() throws Exception {
- final Props props = Props.create(MessageCollectorActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
-
- final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
- actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
- actorContext.setExecuteRemoteOperationResponse("message");
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
transactionProxy.delete(TestModel.TEST_PATH);
- Object messages = testContext
- .executeLocalOperation(actorRef, "messages",
- ActorContext.ASK_DURATION);
-
- Assert.assertNotNull(messages);
-
- Assert.assertTrue(messages instanceof List);
-
- List<Object> listMessages = (List<Object>) messages;
-
- Assert.assertEquals(1, listMessages.size());
-
- Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
+ verify(mockActorContext).sendRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDeleteData());
}
+ @SuppressWarnings("unchecked")
@Test
public void testReady() throws Exception {
- final Props props = Props.create(DoNothingActor.class);
- final ActorRef doNothingActorRef = getSystem().actorOf(props);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
- final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteLocalOperationResponse(createPrimaryFound(doNothingActorRef));
- actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
- actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()).toSerializable());
+ doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqReadData(), anyDuration());
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperation(
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE, schemaContext);
transactionProxy.read(TestModel.TEST_PATH);
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
- Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0);
-
+ assertEquals("getCohortPaths", Arrays.asList(actorRef.path()), proxy.getCohortPaths());
}
@Test
- public void testGetIdentifier(){
- final Props props = Props.create(DoNothingActor.class);
- final ActorRef doNothingActorRef = getSystem().actorOf(props);
-
- final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
-
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
-
- Assert.assertNotNull(transactionProxy.getIdentifier());
+ public void testGetIdentifier() {
+ setupActorContextWithInitialCreateTransaction(READ_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ TransactionProxy.TransactionType.READ_ONLY, schemaContext);
+
+ Object id = transactionProxy.getIdentifier();
+ assertNotNull("getIdentifier returned null", id);
+ assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
}
+ @SuppressWarnings("unchecked")
@Test
- public void testClose(){
- final Props props = Props.create(MessageCollectorActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
+ public void testClose() throws Exception{
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
- final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
- actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
- actorContext.setExecuteRemoteOperationResponse("message");
+ doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqReadData(), anyDuration());
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE, schemaContext);
transactionProxy.read(TestModel.TEST_PATH);
transactionProxy.close();
- Object messages = testContext
- .executeLocalOperation(actorRef, "messages",
- ActorContext.ASK_DURATION);
-
- Assert.assertNotNull(messages);
-
- Assert.assertTrue(messages instanceof List);
-
- List<Object> listMessages = (List<Object>) messages;
-
- Assert.assertEquals(1, listMessages.size());
-
- Assert.assertTrue(listMessages.get(0).getClass().equals(CloseTransaction.SERIALIZABLE_CLASS));
- }
-
- private CreateTransactionReply createTransactionReply(ActorRef actorRef){
- return CreateTransactionReply.newBuilder()
- .setTransactionActorPath(actorRef.path().toString())
- .setTransactionId("txn-1")
- .build();
+ verify(mockActorContext).sendRemoteOperationAsync(
+ eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
}
}
package org.opendaylight.controller.cluster.datastore.utils;
+import java.util.concurrent.TimeUnit;
import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
+
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
}
private static Props props(final boolean found, final ActorRef actorRef){
- return Props.create(new Creator<MockShardManager>() {
+ return Props.create(new MockShardManagerCreator(found, actorRef) );
+ }
- @Override public MockShardManager create()
- throws Exception {
- return new MockShardManager(found,
- actorRef);
- }
- });
+ @SuppressWarnings("serial")
+ private static class MockShardManagerCreator implements Creator<MockShardManager> {
+ final boolean found;
+ final ActorRef actorRef;
+
+ MockShardManagerCreator(boolean found, ActorRef actorRef) {
+ this.found = found;
+ this.actorRef = actorRef;
+ }
+
+ @Override
+ public MockShardManager create() throws Exception {
+ return new MockShardManager(found, actorRef);
+ }
}
}
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef shardManagerActorRef = getSystem()
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef shardManagerActorRef = getSystem()
}};
}
+
+ @Test
+ public void testExecuteRemoteOperation() {
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("3 seconds")) {
+ @Override
+ protected void run() {
+
+ ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+
+ ActorRef shardManagerActorRef = getSystem()
+ .actorOf(MockShardManager.props(true, shardActorRef));
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+ mock(Configuration.class));
+
+ ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+
+ Object out = actorContext.executeRemoteOperation(actor, "hello", duration("3 seconds"));
+
+ assertEquals("hello", out);
+
+ expectNoMsg();
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testExecuteRemoteOperationAsync() {
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("3 seconds")) {
+ @Override
+ protected void run() {
+
+ ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+
+ ActorRef shardManagerActorRef = getSystem()
+ .actorOf(MockShardManager.props(true, shardActorRef));
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+ mock(Configuration.class));
+
+ ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+
+ Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello",
+ Duration.create(3, TimeUnit.SECONDS));
+
+ try {
+ Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
+ assertEquals("Result", "hello", result);
+ } catch(Exception e) {
+ throw new AssertionError(e);
+ }
+
+ expectNoMsg();
+ }
+ };
+ }};
+ }
}
package org.opendaylight.controller.cluster.datastore.utils;
-
+import static org.junit.Assert.assertNotNull;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
public class MockActorContext extends ActorContext {
- private Object executeShardOperationResponse;
- private Object executeRemoteOperationResponse;
- private Object executeLocalOperationResponse;
- private Object executeLocalShardOperationResponse;
+ private volatile Object executeShardOperationResponse;
+ private volatile Object executeRemoteOperationResponse;
+ private volatile Object executeLocalOperationResponse;
+ private volatile Object executeLocalShardOperationResponse;
+ private volatile Exception executeRemoteOperationFailure;
+ private volatile Object inputMessage;
public MockActorContext(ActorSystem actorSystem) {
super(actorSystem, null, new MockClusterWrapper(), new MockConfiguration());
executeRemoteOperationResponse = response;
}
+ public void setExecuteRemoteOperationFailure(Exception executeRemoteOperationFailure) {
+ this.executeRemoteOperationFailure = executeRemoteOperationFailure;
+ }
+
public void setExecuteLocalOperationResponse(
Object executeLocalOperationResponse) {
this.executeLocalOperationResponse = executeLocalOperationResponse;
this.executeLocalShardOperationResponse = executeLocalShardOperationResponse;
}
- @Override public Object executeLocalOperation(ActorRef actor,
+ @SuppressWarnings("unchecked")
+ public <T> T getInputMessage(Class<T> expType) throws Exception {
+ assertNotNull("Input message was null", inputMessage);
+ return (T) expType.getMethod("fromSerializable", Object.class).invoke(null, inputMessage);
+ }
+
+ @Override
+ public Object executeLocalOperation(ActorRef actor,
Object message, FiniteDuration duration) {
return this.executeLocalOperationResponse;
}
- @Override public Object executeLocalShardOperation(String shardName,
+ @Override
+ public Object executeLocalShardOperation(String shardName,
Object message, FiniteDuration duration) {
return this.executeLocalShardOperationResponse;
}