+ @SuppressWarnings("serial")
+ static class TestException extends RuntimeException {
+ }
+
+ static interface Invoker {
+ CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
+ }
+
+ private final Configuration configuration = new MockConfiguration();
+
+ @Mock
+ private ActorContext mockActorContext;
+
+ private SchemaContext schemaContext;
+
+ String memberName = "mock-member";
+
+ @Before
+ public void setUp(){
+ MockitoAnnotations.initMocks(this);
+
+ schemaContext = TestModel.createTestContext();
+
+ doReturn(getSystem()).when(mockActorContext).getActorSystem();
+ doReturn(memberName).when(mockActorContext).getCurrentMemberName();
+ doReturn(schemaContext).when(mockActorContext).getSchemaContext();
+
+ ShardStrategyFactory.setConfiguration(configuration);
+ }
+
+ private CreateTransaction eqCreateTransaction(final String memberName,
+ final TransactionType type) {
+ ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
+ @Override
+ public boolean matches(Object argument) {
+ CreateTransaction obj = CreateTransaction.fromSerializable(argument);
+ return obj.getTransactionId().startsWith(memberName) &&
+ obj.getTransactionType() == type.ordinal();
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private DataExists eqDataExists() {
+ ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
+ @Override
+ public boolean matches(Object argument) {
+ return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+ DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private ReadData eqReadData() {
+ ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
+ @Override
+ public boolean matches(Object argument) {
+ return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+ ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
+ ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
+ @Override
+ public boolean matches(Object argument) {
+ if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+ return false;
+ }
+
+ WriteData obj = WriteData.fromSerializable(argument, schemaContext);
+ return obj.getPath().equals(TestModel.TEST_PATH) &&
+ obj.getData().equals(nodeToWrite);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
+ ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
+ @Override
+ public boolean matches(Object argument) {
+ if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+ return false;
+ }
+
+ MergeData obj = MergeData.fromSerializable(argument, schemaContext);
+ return obj.getPath().equals(TestModel.TEST_PATH) &&
+ obj.getData().equals(nodeToWrite);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private DeleteData eqDeleteData() {
+ ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
+ @Override
+ public boolean matches(Object argument) {
+ return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+ DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private Future<Object> readyTxReply(String path) {
+ return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
+ }
+
+ private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
+ return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
+ }
+
+ private Future<Object> dataExistsReply(boolean exists) {
+ return Futures.successful(new DataExistsReply(exists).toSerializable());
+ }
+
+ private Future<Object> writeDataReply() {
+ return Futures.successful(new WriteDataReply().toSerializable());
+ }
+
+ private Future<Object> mergeDataReply() {
+ return Futures.successful(new MergeDataReply().toSerializable());
+ }
+
+ private Future<Object> deleteDataReply() {
+ return Futures.successful(new DeleteDataReply().toSerializable());
+ }
+
+ private ActorSelection actorSelection(ActorRef actorRef) {
+ return getSystem().actorSelection(actorRef.path());
+ }
+
+ private CreateTransactionReply createTransactionReply(ActorRef actorRef){
+ return CreateTransactionReply.newBuilder()
+ .setTransactionActorPath(actorRef.path().toString())
+ .setTransactionId("txn-1").build();
+ }
+
+ private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) {
+ ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
+ doReturn(getSystem().actorSelection(actorRef.path())).
+ when(mockActorContext).actorSelection(actorRef.path().toString());
+
+ doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))).
+ when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
+ doReturn(createTransactionReply(actorRef)).when(mockActorContext).
+ executeOperation(eq(getSystem().actorSelection(actorRef.path())),
+ eqCreateTransaction(memberName, type));
+ return actorRef;
+ }
+
+ private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
+ throws Throwable {
+
+ try {
+ future.checkedGet(5, TimeUnit.SECONDS);
+ fail("Expected ReadFailedException");
+ } catch(ReadFailedException e) {
+ throw e.getCause();
+ }
+ }