+ private ActorSystem getSystem() {
+ return system;
+ }
+
+ private CreateTransaction eqCreateTransaction(final String memberName,
+ final TransactionType type) {
+ ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
+ @Override
+ public boolean matches(Object argument) {
+ CreateTransaction obj = CreateTransaction.fromSerializable(argument);
+ return obj.getTransactionId().startsWith(memberName) &&
+ obj.getTransactionType() == type.ordinal();
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private DataExists eqSerializedDataExists() {
+ ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
+ @Override
+ public boolean matches(Object argument) {
+ return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+ DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private DataExists eqDataExists() {
+ ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
+ @Override
+ public boolean matches(Object argument) {
+ return (argument instanceof DataExists) &&
+ ((DataExists)argument).getPath().equals(TestModel.TEST_PATH);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private ReadData eqSerializedReadData() {
+ ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
+ @Override
+ public boolean matches(Object argument) {
+ return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+ ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private ReadData eqReadData() {
+ ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
+ @Override
+ public boolean matches(Object argument) {
+ return (argument instanceof ReadData) &&
+ ((ReadData)argument).getPath().equals(TestModel.TEST_PATH);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
+ ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
+ @Override
+ public boolean matches(Object argument) {
+ if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+ return false;
+ }
+
+ WriteData obj = WriteData.fromSerializable(argument, schemaContext);
+ return obj.getPath().equals(TestModel.TEST_PATH) &&
+ obj.getData().equals(nodeToWrite);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
+ ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
+ @Override
+ public boolean matches(Object argument) {
+ if(argument instanceof WriteData) {
+ WriteData obj = (WriteData) argument;
+ return obj.getPath().equals(TestModel.TEST_PATH) &&
+ obj.getData().equals(nodeToWrite);
+ }
+ return false;
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
+ ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
+ @Override
+ public boolean matches(Object argument) {
+ if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+ return false;
+ }
+
+ MergeData obj = MergeData.fromSerializable(argument, schemaContext);
+ return obj.getPath().equals(TestModel.TEST_PATH) &&
+ obj.getData().equals(nodeToWrite);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
+ ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
+ @Override
+ public boolean matches(Object argument) {
+ if(argument instanceof MergeData) {
+ MergeData obj = ((MergeData) argument);
+ return obj.getPath().equals(TestModel.TEST_PATH) &&
+ obj.getData().equals(nodeToWrite);
+ }
+
+ return false;
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private DeleteData eqSerializedDeleteData() {
+ ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
+ @Override
+ public boolean matches(Object argument) {
+ return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+ DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private DeleteData eqDeleteData() {
+ ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
+ @Override
+ public boolean matches(Object argument) {
+ return argument instanceof DeleteData &&
+ ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private Future<Object> readySerializedTxReply(String path) {
+ return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
+ }
+
+ private Future<Object> readyTxReply(String path) {
+ return Futures.successful((Object)new ReadyTransactionReply(path));
+ }
+
+
+ private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
+ return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
+ }
+
+ private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
+ return Futures.successful(new ReadDataReply(schemaContext, data));
+ }
+
+ private Future<Object> dataExistsSerializedReply(boolean exists) {
+ return Futures.successful(new DataExistsReply(exists).toSerializable());
+ }
+
+ private Future<DataExistsReply> dataExistsReply(boolean exists) {
+ return Futures.successful(new DataExistsReply(exists));
+ }
+
+ private Future<Object> writeSerializedDataReply() {
+ return Futures.successful(new WriteDataReply().toSerializable());
+ }
+
+ private Future<WriteDataReply> writeDataReply() {
+ return Futures.successful(new WriteDataReply());
+ }
+
+ private Future<Object> mergeSerializedDataReply() {
+ return Futures.successful(new MergeDataReply().toSerializable());
+ }
+
+ private Future<MergeDataReply> mergeDataReply() {
+ return Futures.successful(new MergeDataReply());
+ }
+
+ private Future<Object> deleteSerializedDataReply() {
+ return Futures.successful(new DeleteDataReply().toSerializable());
+ }
+
+ private Future<DeleteDataReply> deleteDataReply() {
+ return Futures.successful(new DeleteDataReply());
+ }
+
+ private ActorSelection actorSelection(ActorRef actorRef) {
+ return getSystem().actorSelection(actorRef.path());
+ }
+
+ private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
+ return CreateTransactionReply.newBuilder()
+ .setTransactionActorPath(actorRef.path().toString())
+ .setTransactionId("txn-1")
+ .setMessageVersion(transactionVersion)
+ .build();
+ }
+
+ private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) {
+ ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+ doReturn(actorSystem.actorSelection(actorRef.path())).
+ when(mockActorContext).actorSelection(actorRef.path().toString());
+
+ doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
+ when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
+ doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
+ executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
+ eqCreateTransaction(memberName, type));
+
+ doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
+
+ return actorRef;
+ }
+
+ private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
+ return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION);
+ }
+
+
+ private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
+ throws Throwable {
+
+ try {
+ future.checkedGet(5, TimeUnit.SECONDS);
+ fail("Expected ReadFailedException");
+ } catch(ReadFailedException e) {
+ throw e.getCause();
+ }
+ }
+