actorSystem, actorSystem.actorOf(
ShardManager.props(type, cluster, configuration, datastoreContext).
withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
+
+ actorContext.setOperationTimeout(dataStoreProperties.getOperationTimeoutInSeconds());
}
public DistributedDataStore(ActorContext actorContext) {
String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
Object result = actorContext.executeLocalShardOperation(shardName,
- new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
- ActorContext.ASK_DURATION);
+ new RegisterChangeListener(path, dataChangeListenerActor.path(), scope));
if (result != null) {
RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
private final int maxShardDataChangeExecutorQueueSize;
private final int maxShardDataChangeExecutorPoolSize;
private final int shardTransactionIdleTimeoutInMinutes;
+ private final int operationTimeoutInSeconds;
public DistributedDataStoreProperties() {
maxShardDataChangeListenerQueueSize = 1000;
maxShardDataChangeExecutorQueueSize = 1000;
maxShardDataChangeExecutorPoolSize = 20;
shardTransactionIdleTimeoutInMinutes = 10;
+ operationTimeoutInSeconds = 5;
}
public DistributedDataStoreProperties(int maxShardDataChangeListenerQueueSize,
int maxShardDataChangeExecutorQueueSize, int maxShardDataChangeExecutorPoolSize,
- int shardTransactionIdleTimeoutInMinutes) {
+ int shardTransactionIdleTimeoutInMinutes, int operationTimeoutInSeconds) {
this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
this.shardTransactionIdleTimeoutInMinutes = shardTransactionIdleTimeoutInMinutes;
+ this.operationTimeoutInSeconds = operationTimeoutInSeconds;
}
public int getMaxShardDataChangeListenerQueueSize() {
public int getShardTransactionIdleTimeoutInMinutes() {
return shardTransactionIdleTimeoutInMinutes;
}
+
+ public int getOperationTimeoutInSeconds() {
+ return operationTimeoutInSeconds;
+ }
}
ActorSelection cohort = actorContext.actorSelection(actorPath);
- futureList.add(actorContext.executeRemoteOperationAsync(cohort, message,
- ActorContext.ASK_DURATION));
+ futureList.add(actorContext.executeRemoteOperationAsync(cohort, message));
}
return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
try {
Object response = actorContext.executeShardOperation(shardName,
- new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
- ActorContext.ASK_DURATION);
+ new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable());
if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
CreateTransactionReply reply =
CreateTransactionReply.fromSerializable(response);
// Send the ReadyTransaction message to the Tx actor.
final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
- new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
+ new ReadyTransaction().toSerializable());
// Combine all the previously recorded put/merge/delete operation reply Futures and the
// ReadyTransactionReply Future into one Future. If any one fails then the combined
public void deleteData(YangInstanceIdentifier path) {
LOG.debug("Tx {} deleteData called path = {}", identifier, path);
recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
- new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION ));
+ new DeleteData(path).toSerializable() ));
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", identifier, path);
recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
- new MergeData(path, data, schemaContext).toSerializable(),
- ActorContext.ASK_DURATION));
+ new MergeData(path, data, schemaContext).toSerializable()));
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", identifier, path);
recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
- new WriteData(path, data, schemaContext).toSerializable(),
- ActorContext.ASK_DURATION));
+ new WriteData(path, data, schemaContext).toSerializable()));
}
@Override
};
Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
- new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
+ new ReadData(path).toSerializable());
readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
}
};
Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
- new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
+ new DataExists(path).toSerializable());
future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
}
}
private static final Logger
LOG = LoggerFactory.getLogger(ActorContext.class);
- public static final FiniteDuration ASK_DURATION =
- Duration.create(5, TimeUnit.SECONDS);
- public static final Duration AWAIT_DURATION =
- Duration.create(5, TimeUnit.SECONDS);
+ private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
public static final String MAILBOX = "bounded-mailbox";
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
private volatile SchemaContext schemaContext;
+ private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
+ private Timeout operationTimeout = new Timeout(operationDuration);
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper,
}
}
+ public void setOperationTimeout(int timeoutInSeconds) {
+ operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
+ operationTimeout = new Timeout(operationDuration);
+ }
+
public SchemaContext getSchemaContext() {
return schemaContext;
}
*/
public ActorRef findLocalShard(String shardName) {
Object result = executeLocalOperation(shardManager,
- new FindLocalShard(shardName), ASK_DURATION);
+ new FindLocalShard(shardName));
if (result instanceof LocalShardFound) {
LocalShardFound found = (LocalShardFound) result;
public String findPrimaryPath(String shardName) {
Object result = executeLocalOperation(shardManager,
- new FindPrimary(shardName).toSerializable(), ASK_DURATION);
+ new FindPrimary(shardName).toSerializable());
if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
PrimaryFound found = PrimaryFound.fromSerializable(result);
*
* @param actor
* @param message
- * @param duration
* @return The response of the operation
*/
- public Object executeLocalOperation(ActorRef actor, Object message,
- FiniteDuration duration) {
- Future<Object> future =
- ask(actor, message, new Timeout(duration));
+ public Object executeLocalOperation(ActorRef actor, Object message) {
+ Future<Object> future = ask(actor, message, operationTimeout);
try {
- return Await.result(future, AWAIT_DURATION);
+ return Await.result(future, operationDuration);
} catch (Exception e) {
throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
}
*
* @param actor
* @param message
- * @param duration
* @return
*/
- public Object executeRemoteOperation(ActorSelection actor, Object message,
- FiniteDuration duration) {
+ public Object executeRemoteOperation(ActorSelection actor, Object message) {
LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
- Future<Object> future =
- ask(actor, message, new Timeout(duration));
+ Future<Object> future = ask(actor, message, operationTimeout);
try {
- return Await.result(future, AWAIT_DURATION);
+ return Await.result(future, operationDuration);
} catch (Exception e) {
- throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
+ throw new TimeoutException("Sending message " + message.getClass().toString() +
+ " to actor " + actor.toString() + " failed" , e);
}
}
*
* @param actor the ActorSelection
* @param message the message to send
- * @param duration the maximum amount of time to send he message
* @return a Future containing the eventual result
*/
- public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message,
- FiniteDuration duration) {
+ public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message) {
LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
- return ask(actor, message, new Timeout(duration));
+ return ask(actor, message, operationTimeout);
}
/**
*
* @param shardName
* @param message
- * @param duration
* @return
* @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
* @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
*/
- public Object executeShardOperation(String shardName, Object message,
- FiniteDuration duration) {
+ public Object executeShardOperation(String shardName, Object message) {
ActorSelection primary = findPrimary(shardName);
- return executeRemoteOperation(primary, message, duration);
+ return executeRemoteOperation(primary, message);
}
/**
*
* @param shardName the name of the shard on which the operation needs to be executed
* @param message the message that needs to be sent to the shard
- * @param duration the time duration in which this operation should complete
* @return the message that was returned by the local actor on which the
* the operation was executed. If a local shard was not found then
* null is returned
* @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
* if the operation does not complete in a specified time duration
*/
- public Object executeLocalShardOperation(String shardName, Object message,
- FiniteDuration duration) {
+ public Object executeLocalShardOperation(String shardName, Object message) {
ActorRef local = findLocalShard(shardName);
if(local != null) {
- return executeLocalOperation(local, message, duration);
+ return executeLocalOperation(local, message);
}
return null;
}
return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
- new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(),
- props.getMaxShardDataChangeExecutorQueueSize(),
- props.getMaxShardDataChangeListenerQueueSize(),
- props.getShardTransactionIdleTimeoutInMinutes()));
+ new DistributedDataStoreProperties(
+ props.getMaxShardDataChangeExecutorPoolSize().getValue(),
+ props.getMaxShardDataChangeExecutorQueueSize().getValue(),
+ props.getMaxShardDataChangeListenerQueueSize().getValue(),
+ props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+ props.getOperationTimeoutInSeconds().getValue()));
}
}
return DistributedDataStoreFactory.createInstance("operational",
getOperationalSchemaServiceDependency(),
- new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(),
- props.getMaxShardDataChangeExecutorQueueSize(),
- props.getMaxShardDataChangeListenerQueueSize(),
- props.getShardTransactionIdleTimeoutInMinutes()));
+ new DistributedDataStoreProperties(
+ props.getMaxShardDataChangeExecutorPoolSize().getValue(),
+ props.getMaxShardDataChangeExecutorQueueSize().getValue(),
+ props.getMaxShardDataChangeListenerQueueSize().getValue(),
+ props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+ props.getOperationTimeoutInSeconds().getValue()));
}
}
config:java-name-prefix DistributedOperationalDataStoreProvider;
}
+ typedef non-zero-uint16-type {
+ type uint16 {
+ range "1..max";
+ }
+ }
+
+ typedef operation-timeout-type {
+ type uint16 {
+ range "5..max";
+ }
+ }
+
grouping data-store-properties {
leaf max-shard-data-change-executor-queue-size {
default 1000;
- type uint16;
+ type non-zero-uint16-type;
description "The maximum queue size for each shard's data store data change notification executor.";
}
leaf max-shard-data-change-executor-pool-size {
default 20;
- type uint16;
+ type non-zero-uint16-type;
description "The maximum thread pool size for each shard's data store data change notification executor.";
}
leaf max-shard-data-change-listener-queue-size {
default 1000;
- type uint16;
+ type non-zero-uint16-type;
description "The maximum queue size for each shard's data store data change listeners.";
}
leaf shard-transaction-idle-timeout-in-minutes {
default 10;
- type uint16;
+ type non-zero-uint16-type;
description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
}
+
+ leaf operation-timeout-in-seconds {
+ default 5;
+ type operation-timeout-type;
+ description "The maximum amount of time for akka operations (remote or local) to complete before failing.";
+ }
}
// Augments the 'configuration' choice node under modules/module.
ActorContext
testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
Object messages = testContext
- .executeLocalOperation(actorRef, "messages",
- ActorContext.ASK_DURATION);
+ .executeLocalOperation(actorRef, "messages");
Assert.assertNotNull(messages);
ActorContext
testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
Object messages = testContext
- .executeLocalOperation(actorRef, "messages",
- ActorContext.ASK_DURATION);
+ .executeLocalOperation(actorRef, "messages");
Assert.assertNotNull(messages);
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
+import akka.util.Timeout;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
).build();
+ Timeout askTimeout = new Timeout(ASK_RESULT_DURATION);
+
//This is done so that Modification list is updated which is used during commit
- Future future =
- akka.pattern.Patterns.ask(shardTransaction, writeData, 3000);
+ Future<Object> future = akka.pattern.Patterns.ask(shardTransaction, writeData, askTimeout);
//ready transaction creates the cohort so that we get into the
//block where in commmit is done
ShardTransactionMessages.ReadyTransaction readyTransaction =
ShardTransactionMessages.ReadyTransaction.newBuilder().build();
- future =
- akka.pattern.Patterns.ask(shardTransaction, readyTransaction, 3000);
+ future = akka.pattern.Patterns.ask(shardTransaction, readyTransaction, askTimeout);
//but when the message is sent it will have the MockCommit object
//so that we can simulate throwing of exception
when(mockModification.toSerializable()).thenReturn(
PersistentMessages.CompositeModification.newBuilder().build());
- future =
- akka.pattern.Patterns.ask(subject,
- mockForwardCommitTransaction
- , 3000);
+ future = akka.pattern.Patterns.ask(subject, mockForwardCommitTransaction, askTimeout);
Await.result(future, ASK_RESULT_DURATION);
}
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
import java.util.List;
import java.util.concurrent.ExecutionException;
}
stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class),
- isA(requestType), any(FiniteDuration.class));
+ isA(requestType));
}
private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
verify(actorContext, times(nCohorts)).executeRemoteOperationAsync(
- any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
+ any(ActorSelection.class), isA(requestType));
}
private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.List;
import java.util.concurrent.TimeUnit;
return getSystem().actorSelection(actorRef.path());
}
- private FiniteDuration anyDuration() {
- return any(FiniteDuration.class);
- }
-
private CreateTransactionReply createTransactionReply(ActorRef actorRef){
return CreateTransactionReply.newBuilder()
.setTransactionActorPath(actorRef.path().toString())
when(mockActorContext).actorSelection(actorRef.path().toString());
doReturn(createTransactionReply(actorRef)).when(mockActorContext).
executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
- eqCreateTransaction(memberName, type), anyDuration());
+ eqCreateTransaction(memberName, type));
doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
anyString(), eq(actorRef.path().toString()));
doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
READ_ONLY);
doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+ executeRemoteOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+ executeRemoteOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
throws Throwable {
doThrow(exToThrow).when(mockActorContext).executeShardOperation(
- anyString(), any(), anyDuration());
+ anyString(), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
- anyDuration());
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
} finally {
verify(mockActorContext, times(0)).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
}
}
NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(expectedNode), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(expectedNode));
doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
READ_ONLY);
doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+ eq(actorSelection(actorRef)), eqDataExists());
Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
assertEquals("Exists response", false, exists);
doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+ eq(actorSelection(actorRef)), eqDataExists());
exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+ executeRemoteOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+ executeRemoteOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
- anyDuration());
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+ eq(actorSelection(actorRef)), eqDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
} finally {
verify(mockActorContext, times(0)).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+ eq(actorSelection(actorRef)), eqDataExists());
}
}
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+ eq(actorSelection(actorRef)), eqDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
verify(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
WriteDataReply.SERIALIZABLE_CLASS);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
verify(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
MergeDataReply.SERIALIZABLE_CLASS);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+ eq(actorSelection(actorRef)), eqDeleteData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
transactionProxy.delete(TestModel.TEST_PATH);
verify(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+ eq(actorSelection(actorRef)), eqDeleteData());
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
DeleteDataReply.SERIALIZABLE_CLASS);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite),
- anyDuration());
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeRemoteOperationAsync(eq(actorSelection(actorRef)),
- isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+ isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
public void testReadyWithInitialCreateTransactionFailure() throws Exception {
doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
- anyString(), any(), anyDuration());
+ anyString(), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(Futures.successful(new Object())).when(mockActorContext).
executeRemoteOperationAsync(eq(actorSelection(actorRef)),
- isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+ isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
mock(Configuration.class));
- Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+ Object out = actorContext.executeLocalShardOperation("default", "hello");
assertEquals("hello", out);
new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
mock(Configuration.class));
- Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+ Object out = actorContext.executeLocalShardOperation("default", "hello");
assertNull(out);
ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
- Object out = actorContext.executeRemoteOperation(actor, "hello", duration("3 seconds"));
+ Object out = actorContext.executeRemoteOperation(actor, "hello");
assertEquals("hello", out);
ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
- Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello",
- Duration.create(3, TimeUnit.SECONDS));
+ Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello");
try {
Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
-import scala.concurrent.duration.FiniteDuration;
public class MockActorContext extends ActorContext {
@Override public Object executeShardOperation(String shardName,
- Object message, FiniteDuration duration) {
+ Object message) {
return executeShardOperationResponse;
}
@Override public Object executeRemoteOperation(ActorSelection actor,
- Object message, FiniteDuration duration) {
+ Object message) {
return executeRemoteOperationResponse;
}
@Override
public Object executeLocalOperation(ActorRef actor,
- Object message, FiniteDuration duration) {
+ Object message) {
return this.executeLocalOperationResponse;
}
@Override
public Object executeLocalShardOperation(String shardName,
- Object message, FiniteDuration duration) {
+ Object message) {
return this.executeLocalShardOperationResponse;
}
}
ActorContext testContext = new ActorContext(actorSystem, actorSystem.actorOf(
Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
Object messages = testContext
- .executeLocalOperation(actorRef, "messages",
- ActorContext.ASK_DURATION);
+ .executeLocalOperation(actorRef, "messages");
Assert.assertNotNull(messages);