1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.junit.Assert.fail;
7 import static org.mockito.Matchers.any;
8 import static org.mockito.Matchers.anyString;
9 import static org.mockito.Matchers.argThat;
10 import static org.mockito.Matchers.eq;
11 import static org.mockito.Matchers.isA;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
17 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
18 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSelection;
21 import akka.actor.ActorSystem;
22 import akka.actor.Props;
23 import akka.dispatch.Futures;
24 import akka.testkit.JavaTestKit;
25 import com.google.common.base.Optional;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.util.concurrent.CheckedFuture;
28 import com.google.common.util.concurrent.FutureCallback;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.typesafe.config.Config;
31 import com.typesafe.config.ConfigFactory;
32 import java.io.IOException;
33 import java.util.List;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import org.junit.AfterClass;
38 import org.junit.Assert;
39 import org.junit.Before;
40 import org.junit.BeforeClass;
41 import org.junit.Test;
42 import org.mockito.ArgumentMatcher;
43 import org.mockito.Mock;
44 import org.mockito.Mockito;
45 import org.mockito.MockitoAnnotations;
46 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
47 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
49 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
52 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
53 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
54 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
55 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
56 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
62 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
63 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
64 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
65 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
66 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
67 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
68 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
69 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
70 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
71 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
72 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
73 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
74 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
75 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
76 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
77 import scala.concurrent.Await;
78 import scala.concurrent.Future;
79 import scala.concurrent.Promise;
80 import scala.concurrent.duration.Duration;
82 @SuppressWarnings("resource")
83 public class TransactionProxyTest {
85 @SuppressWarnings("serial")
86 static class TestException extends RuntimeException {
89 static interface Invoker {
90 CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
93 private static ActorSystem system;
95 private final Configuration configuration = new MockConfiguration();
98 private ActorContext mockActorContext;
100 private SchemaContext schemaContext;
103 private ClusterWrapper mockClusterWrapper;
105 String memberName = "mock-member";
108 public static void setUpClass() throws IOException {
110 Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder().
111 put("akka.actor.default-dispatcher.type",
112 "akka.testkit.CallingThreadDispatcherConfigurator").build()).
113 withFallback(ConfigFactory.load());
114 system = ActorSystem.create("test", config);
118 public static void tearDownClass() throws IOException {
119 JavaTestKit.shutdownActorSystem(system);
125 MockitoAnnotations.initMocks(this);
127 schemaContext = TestModel.createTestContext();
129 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
131 doReturn(getSystem()).when(mockActorContext).getActorSystem();
132 doReturn(memberName).when(mockActorContext).getCurrentMemberName();
133 doReturn(schemaContext).when(mockActorContext).getSchemaContext();
134 doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
135 doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
136 doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
137 doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
139 ShardStrategyFactory.setConfiguration(configuration);
142 private ActorSystem getSystem() {
146 private CreateTransaction eqCreateTransaction(final String memberName,
147 final TransactionType type) {
148 ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
150 public boolean matches(Object argument) {
151 if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
152 CreateTransaction obj = CreateTransaction.fromSerializable(argument);
153 return obj.getTransactionId().startsWith(memberName) &&
154 obj.getTransactionType() == type.ordinal();
161 return argThat(matcher);
164 private DataExists eqSerializedDataExists() {
165 ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
167 public boolean matches(Object argument) {
168 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
169 DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
173 return argThat(matcher);
176 private DataExists eqDataExists() {
177 ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
179 public boolean matches(Object argument) {
180 return (argument instanceof DataExists) &&
181 ((DataExists)argument).getPath().equals(TestModel.TEST_PATH);
185 return argThat(matcher);
188 private ReadData eqSerializedReadData() {
189 ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
191 public boolean matches(Object argument) {
192 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
193 ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
197 return argThat(matcher);
200 private ReadData eqReadData() {
201 ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
203 public boolean matches(Object argument) {
204 return (argument instanceof ReadData) &&
205 ((ReadData)argument).getPath().equals(TestModel.TEST_PATH);
209 return argThat(matcher);
212 private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
213 return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
216 private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite,
217 final int transactionVersion) {
218 ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
220 public boolean matches(Object argument) {
221 if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
222 WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
223 (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
224 ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) {
226 WriteData obj = WriteData.fromSerializable(argument);
227 return obj.getPath().equals(TestModel.TEST_PATH) &&
228 obj.getData().equals(nodeToWrite);
235 return argThat(matcher);
238 private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
239 ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
241 public boolean matches(Object argument) {
242 if(argument instanceof WriteData) {
243 WriteData obj = (WriteData) argument;
244 return obj.getPath().equals(TestModel.TEST_PATH) &&
245 obj.getData().equals(nodeToWrite);
251 return argThat(matcher);
254 private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
255 return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
258 private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite,
259 final int transactionVersion) {
260 ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
262 public boolean matches(Object argument) {
263 if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
264 MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
265 (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
266 ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) {
268 MergeData obj = MergeData.fromSerializable(argument);
269 return obj.getPath().equals(TestModel.TEST_PATH) &&
270 obj.getData().equals(nodeToWrite);
277 return argThat(matcher);
280 private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
281 ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
283 public boolean matches(Object argument) {
284 if(argument instanceof MergeData) {
285 MergeData obj = ((MergeData) argument);
286 return obj.getPath().equals(TestModel.TEST_PATH) &&
287 obj.getData().equals(nodeToWrite);
294 return argThat(matcher);
297 private DeleteData eqSerializedDeleteData() {
298 ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
300 public boolean matches(Object argument) {
301 return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
302 DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
306 return argThat(matcher);
309 private DeleteData eqDeleteData() {
310 ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
312 public boolean matches(Object argument) {
313 return argument instanceof DeleteData &&
314 ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH);
318 return argThat(matcher);
321 private Future<Object> readySerializedTxReply(String path) {
322 return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
325 private Future<Object> readyTxReply(String path) {
326 return Futures.successful((Object)new ReadyTransactionReply(path));
329 private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
330 short transactionVersion) {
331 return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion));
334 private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
335 return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION);
338 private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
339 return Futures.successful(new ReadDataReply(data));
342 private Future<Object> dataExistsSerializedReply(boolean exists) {
343 return Futures.successful(new DataExistsReply(exists).toSerializable());
346 private Future<DataExistsReply> dataExistsReply(boolean exists) {
347 return Futures.successful(new DataExistsReply(exists));
350 private Future<Object> writeSerializedDataReply(short version) {
351 return Futures.successful(new WriteDataReply().toSerializable(version));
354 private Future<Object> writeSerializedDataReply() {
355 return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
358 private Future<WriteDataReply> writeDataReply() {
359 return Futures.successful(new WriteDataReply());
362 private Future<Object> mergeSerializedDataReply(short version) {
363 return Futures.successful(new MergeDataReply().toSerializable(version));
366 private Future<Object> mergeSerializedDataReply() {
367 return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
370 private Future<Object> incompleteFuture(){
371 return mock(Future.class);
374 private Future<MergeDataReply> mergeDataReply() {
375 return Futures.successful(new MergeDataReply());
378 private Future<Object> deleteSerializedDataReply(short version) {
379 return Futures.successful(new DeleteDataReply().toSerializable(version));
382 private Future<Object> deleteSerializedDataReply() {
383 return deleteSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
386 private Future<DeleteDataReply> deleteDataReply() {
387 return Futures.successful(new DeleteDataReply());
390 private ActorSelection actorSelection(ActorRef actorRef) {
391 return getSystem().actorSelection(actorRef.path());
394 private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
395 return CreateTransactionReply.newBuilder()
396 .setTransactionActorPath(actorRef.path().toString())
397 .setTransactionId("txn-1")
398 .setMessageVersion(transactionVersion)
402 private ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
403 ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
404 doReturn(actorSystem.actorSelection(actorRef.path())).
405 when(mockActorContext).actorSelection(actorRef.path().toString());
407 doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
408 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
410 doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
412 doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
417 private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
418 TransactionType type, int transactionVersion) {
419 ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem);
421 doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
422 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
423 eqCreateTransaction(memberName, type));
428 private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
429 return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
433 private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
437 future.checkedGet(5, TimeUnit.SECONDS);
438 fail("Expected ReadFailedException");
439 } catch(ReadFailedException e) {
445 public void testRead() throws Exception {
446 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
448 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
451 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
452 eq(actorSelection(actorRef)), eqSerializedReadData());
454 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
455 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
457 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
459 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
461 doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
462 eq(actorSelection(actorRef)), eqSerializedReadData());
464 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
466 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
468 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
471 @Test(expected = ReadFailedException.class)
472 public void testReadWithInvalidReplyMessageType() throws Exception {
473 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
475 doReturn(Futures.successful(new Object())).when(mockActorContext).
476 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
478 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
481 transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
484 @Test(expected = TestException.class)
485 public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
486 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
488 doReturn(Futures.failed(new TestException())).when(mockActorContext).
489 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
491 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
494 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
497 private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
499 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
501 if (exToThrow instanceof PrimaryNotFoundException) {
502 doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
504 doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
505 when(mockActorContext).findPrimaryShardAsync(anyString());
508 doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
509 any(ActorSelection.class), any());
511 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
513 propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
516 private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
517 testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
519 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
520 return proxy.read(TestModel.TEST_PATH);
525 @Test(expected = PrimaryNotFoundException.class)
526 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
527 testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
530 @Test(expected = TimeoutException.class)
531 public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
532 testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
533 new Exception("reason")));
536 @Test(expected = TestException.class)
537 public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
538 testReadWithExceptionOnInitialCreateTransaction(new TestException());
541 @Test(expected = TestException.class)
542 public void testReadWithPriorRecordingOperationFailure() throws Throwable {
543 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
545 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
547 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
548 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
550 doReturn(Futures.failed(new TestException())).when(mockActorContext).
551 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
553 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
554 eq(actorSelection(actorRef)), eqSerializedReadData());
556 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
559 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
561 transactionProxy.delete(TestModel.TEST_PATH);
564 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
566 verify(mockActorContext, times(0)).executeOperationAsync(
567 eq(actorSelection(actorRef)), eqSerializedReadData());
572 public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
573 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
575 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
577 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
578 eq(actorSelection(actorRef)), eqSerializedWriteData(expectedNode));
580 doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
581 eq(actorSelection(actorRef)), eqSerializedReadData());
583 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
586 transactionProxy.write(TestModel.TEST_PATH, expectedNode);
588 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
589 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
591 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
593 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
596 @Test(expected=IllegalStateException.class)
597 public void testReadPreConditionCheck() {
599 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
602 transactionProxy.read(TestModel.TEST_PATH);
605 @Test(expected=IllegalArgumentException.class)
606 public void testInvalidCreateTransactionReply() throws Throwable {
607 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
609 doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
610 actorSelection(actorRef.path().toString());
612 doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
613 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
615 doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
616 eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
618 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
620 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
624 public void testExists() throws Exception {
625 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
627 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
630 doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
631 eq(actorSelection(actorRef)), eqSerializedDataExists());
633 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
635 assertEquals("Exists response", false, exists);
637 doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
638 eq(actorSelection(actorRef)), eqSerializedDataExists());
640 exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
642 assertEquals("Exists response", true, exists);
645 @Test(expected = PrimaryNotFoundException.class)
646 public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
647 testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
649 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
650 return proxy.exists(TestModel.TEST_PATH);
655 @Test(expected = ReadFailedException.class)
656 public void testExistsWithInvalidReplyMessageType() throws Exception {
657 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
659 doReturn(Futures.successful(new Object())).when(mockActorContext).
660 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
662 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
665 transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
668 @Test(expected = TestException.class)
669 public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
670 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
672 doReturn(Futures.failed(new TestException())).when(mockActorContext).
673 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
675 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
678 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
681 @Test(expected = TestException.class)
682 public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
683 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
685 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
687 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
688 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
690 doReturn(Futures.failed(new TestException())).when(mockActorContext).
691 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
693 doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
694 eq(actorSelection(actorRef)), eqSerializedDataExists());
696 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
699 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
701 transactionProxy.delete(TestModel.TEST_PATH);
704 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
706 verify(mockActorContext, times(0)).executeOperationAsync(
707 eq(actorSelection(actorRef)), eqSerializedDataExists());
712 public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
713 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
715 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
717 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
718 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
720 doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
721 eq(actorSelection(actorRef)), eqSerializedDataExists());
723 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
726 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
728 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
730 assertEquals("Exists response", true, exists);
733 @Test(expected=IllegalStateException.class)
734 public void testExistsPreConditionCheck() {
736 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
739 transactionProxy.exists(TestModel.TEST_PATH);
742 private void verifyRecordingOperationFutures(List<Future<Object>> futures,
743 Class<?>... expResultTypes) throws Exception {
744 assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
747 for( Future<Object> future: futures) {
748 assertNotNull("Recording operation Future is null", future);
750 Class<?> expResultType = expResultTypes[i++];
751 if(Throwable.class.isAssignableFrom(expResultType)) {
753 Await.result(future, Duration.create(5, TimeUnit.SECONDS));
754 fail("Expected exception from recording operation Future");
755 } catch(Exception e) {
759 assertEquals("Recording operation Future result type", expResultType,
760 Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
766 public void testWrite() throws Exception {
767 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
769 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
771 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
772 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
774 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
777 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
779 verify(mockActorContext).executeOperationAsync(
780 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
782 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
783 WriteDataReply.class);
787 public void testWriteAfterAsyncRead() throws Throwable {
788 ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem());
790 Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
791 doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
792 eq(getSystem().actorSelection(actorRef.path())),
793 eqCreateTransaction(memberName, READ_WRITE));
795 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
796 eq(actorSelection(actorRef)), eqSerializedReadData());
798 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
800 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
801 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
803 final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
805 final CountDownLatch readComplete = new CountDownLatch(1);
806 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
807 com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
808 new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
810 public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
812 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
813 } catch (Exception e) {
816 readComplete.countDown();
821 public void onFailure(Throwable t) {
823 readComplete.countDown();
827 createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
829 Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
831 if(caughtEx.get() != null) {
832 throw caughtEx.get();
835 verify(mockActorContext).executeOperationAsync(
836 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
838 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
839 WriteDataReply.class);
842 @Test(expected=IllegalStateException.class)
843 public void testWritePreConditionCheck() {
845 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
848 transactionProxy.write(TestModel.TEST_PATH,
849 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
852 @Test(expected=IllegalStateException.class)
853 public void testWriteAfterReadyPreConditionCheck() {
855 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
858 transactionProxy.ready();
860 transactionProxy.write(TestModel.TEST_PATH,
861 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
865 public void testMerge() throws Exception {
866 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
868 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
870 doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
871 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
873 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
875 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
877 verify(mockActorContext).executeOperationAsync(
878 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
880 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
881 MergeDataReply.class);
885 public void testDelete() throws Exception {
886 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
888 doReturn(deleteSerializedDataReply()).when(mockActorContext).executeOperationAsync(
889 eq(actorSelection(actorRef)), eqSerializedDeleteData());
891 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
894 transactionProxy.delete(TestModel.TEST_PATH);
896 verify(mockActorContext).executeOperationAsync(
897 eq(actorSelection(actorRef)), eqSerializedDeleteData());
899 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
900 DeleteDataReply.class);
903 private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
904 Object... expReplies) throws Exception {
905 assertEquals("getReadyOperationFutures size", expReplies.length,
906 proxy.getCohortFutures().size());
909 for( Future<ActorSelection> future: proxy.getCohortFutures()) {
910 assertNotNull("Ready operation Future is null", future);
912 Object expReply = expReplies[i++];
913 if(expReply instanceof ActorSelection) {
914 ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
915 assertEquals("Cohort actor path", expReply, actual);
917 // Expecting exception.
919 Await.result(future, Duration.create(5, TimeUnit.SECONDS));
920 fail("Expected exception from ready operation Future");
921 } catch(Exception e) {
929 public void testReady() throws Exception {
930 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
932 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
934 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
935 eq(actorSelection(actorRef)), eqSerializedReadData());
937 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
938 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
940 doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
941 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
943 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
946 transactionProxy.read(TestModel.TEST_PATH);
948 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
950 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
952 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
954 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
956 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
957 WriteDataReply.class);
959 verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
962 private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
963 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(),
964 READ_WRITE, version);
966 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
968 doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
969 eq(actorSelection(actorRef)), eqSerializedReadData());
971 doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
972 eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version));
974 doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
975 eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version));
977 doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
978 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
980 doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
981 eq(actorRef.path().toString()));
983 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
985 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
986 get(5, TimeUnit.SECONDS);
988 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
989 assertEquals("Response NormalizedNode", testNode, readOptional.get());
991 transactionProxy.write(TestModel.TEST_PATH, testNode);
993 transactionProxy.merge(TestModel.TEST_PATH, testNode);
995 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
997 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
999 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1001 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1002 ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class);
1004 verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
1010 public void testCompatibilityWithBaseHeliumVersion() throws Exception {
1011 ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
1013 verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
1014 eq(actorRef.path().toString()));
1018 public void testCompatibilityWithHeliumR1Version() throws Exception {
1019 ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION);
1021 verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
1022 eq(actorRef.path().toString()));
1026 public void testReadyWithRecordingOperationFailure() throws Exception {
1027 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
1029 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1031 doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
1032 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
1034 doReturn(Futures.failed(new TestException())).when(mockActorContext).
1035 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
1037 doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
1038 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
1040 doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
1042 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1045 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1047 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1049 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1051 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1053 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1055 verifyCohortFutures(proxy, TestException.class);
1057 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1058 MergeDataReply.class, TestException.class);
1062 public void testReadyWithReplyFailure() throws Exception {
1063 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
1065 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1067 doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
1068 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
1070 doReturn(Futures.failed(new TestException())).when(mockActorContext).
1071 executeOperationAsync(eq(actorSelection(actorRef)),
1072 isA(ReadyTransaction.SERIALIZABLE_CLASS));
1074 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1077 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1079 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1081 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1083 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1085 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1086 MergeDataReply.class);
1088 verifyCohortFutures(proxy, TestException.class);
1092 public void testReadyWithInitialCreateTransactionFailure() throws Exception {
1094 doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
1095 mockActorContext).findPrimaryShardAsync(anyString());
1097 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1100 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1102 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1104 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1106 transactionProxy.delete(TestModel.TEST_PATH);
1108 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1110 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1112 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1114 verifyCohortFutures(proxy, PrimaryNotFoundException.class);
1118 public void testReadyWithInvalidReplyMessageType() throws Exception {
1119 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
1121 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1123 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
1124 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
1126 doReturn(Futures.successful(new Object())).when(mockActorContext).
1127 executeOperationAsync(eq(actorSelection(actorRef)),
1128 isA(ReadyTransaction.SERIALIZABLE_CLASS));
1130 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1133 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1135 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1137 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1139 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1141 verifyCohortFutures(proxy, IllegalArgumentException.class);
1145 public void testGetIdentifier() {
1146 setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
1147 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1148 TransactionProxy.TransactionType.READ_ONLY);
1150 Object id = transactionProxy.getIdentifier();
1151 assertNotNull("getIdentifier returned null", id);
1152 assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
1156 public void testClose() throws Exception{
1157 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1159 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
1160 eq(actorSelection(actorRef)), eqSerializedReadData());
1162 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1165 transactionProxy.read(TestModel.TEST_PATH);
1167 transactionProxy.close();
1169 verify(mockActorContext).sendOperationAsync(
1170 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
1175 * Method to test a local Tx actor. The Tx paths are matched to decide if the
1176 * Tx actor is local or not. This is done by mocking the Tx actor path
1177 * and the caller paths and ensuring that the paths have the remote-address format
1179 * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
1180 * the paths returned for the actors for all the tests are not qualified remote paths.
1181 * Hence are treated as non-local/remote actors. In short, all tests except
1182 * few below run for remote actors
1187 public void testLocalTxActorRead() throws Exception {
1188 ActorSystem actorSystem = getSystem();
1189 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1191 doReturn(actorSystem.actorSelection(shardActorRef.path())).
1192 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1194 doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1195 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1197 String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1198 CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1199 .setTransactionId("txn-1")
1200 .setTransactionActorPath(actorPath)
1203 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1204 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1205 eqCreateTransaction(memberName, READ_ONLY));
1207 doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1209 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
1211 // negative test case with null as the reply
1212 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
1213 any(ActorSelection.class), eqReadData());
1215 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1216 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1218 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
1220 // test case with node as read data reply
1221 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1223 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1224 any(ActorSelection.class), eqReadData());
1226 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1228 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1230 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
1232 // test for local data exists
1233 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1234 any(ActorSelection.class), eqDataExists());
1236 boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1238 assertEquals("Exists response", true, exists);
1242 public void testLocalTxActorWrite() throws Exception {
1243 ActorSystem actorSystem = getSystem();
1244 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1246 doReturn(actorSystem.actorSelection(shardActorRef.path())).
1247 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1249 doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1250 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1252 String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1253 CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1254 .setTransactionId("txn-1")
1255 .setTransactionActorPath(actorPath)
1258 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1259 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1260 eqCreateTransaction(memberName, WRITE_ONLY));
1262 doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1264 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1266 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
1267 any(ActorSelection.class), eqWriteData(nodeToWrite));
1269 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
1270 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1272 verify(mockActorContext).executeOperationAsync(
1273 any(ActorSelection.class), eqWriteData(nodeToWrite));
1275 //testing local merge
1276 doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
1277 any(ActorSelection.class), eqMergeData(nodeToWrite));
1279 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1281 verify(mockActorContext).executeOperationAsync(
1282 any(ActorSelection.class), eqMergeData(nodeToWrite));
1285 //testing local delete
1286 doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
1287 any(ActorSelection.class), eqDeleteData());
1289 transactionProxy.delete(TestModel.TEST_PATH);
1291 verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData());
1293 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1294 WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class);
1297 doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
1298 any(ActorSelection.class), isA(ReadyTransaction.class));
1300 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1302 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1304 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1306 verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
1309 private static interface TransactionProxyOperation {
1310 void run(TransactionProxy transactionProxy);
1313 private void throttleOperation(TransactionProxyOperation operation) {
1314 throttleOperation(operation, 1, true);
1317 private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
1318 ActorSystem actorSystem = getSystem();
1319 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1321 doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
1323 doReturn(actorSystem.actorSelection(shardActorRef.path())).
1324 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1327 doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1328 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1330 doReturn(Futures.failed(new Exception("not found")))
1331 .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1334 String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1335 CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1336 .setTransactionId("txn-1")
1337 .setTransactionActorPath(actorPath)
1340 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1341 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1342 eqCreateTransaction(memberName, READ_WRITE));
1344 doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1346 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1348 long start = System.currentTimeMillis();
1350 operation.run(transactionProxy);
1352 long end = System.currentTimeMillis();
1354 Assert.assertTrue(String.format("took less time than expected %s was %s",
1355 mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000,
1356 (end-start)), (end - start) > mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000);
1360 private void completeOperation(TransactionProxyOperation operation){
1361 completeOperation(operation, true);
1364 private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
1365 ActorSystem actorSystem = getSystem();
1366 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1368 doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
1370 doReturn(actorSystem.actorSelection(shardActorRef.path())).
1371 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1374 doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1375 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1377 doReturn(Futures.failed(new Exception("not found")))
1378 .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1381 String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1382 CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1383 .setTransactionId("txn-1")
1384 .setTransactionActorPath(actorPath)
1387 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1388 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1389 eqCreateTransaction(memberName, READ_WRITE));
1391 doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1393 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1395 long start = System.currentTimeMillis();
1397 operation.run(transactionProxy);
1399 long end = System.currentTimeMillis();
1401 Assert.assertTrue(String.format("took more time than expected %s was %s",
1402 mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000,
1403 (end-start)), (end - start) <= mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000);
1406 public void testWriteThrottling(boolean shardFound){
1408 throttleOperation(new TransactionProxyOperation() {
1410 public void run(TransactionProxy transactionProxy) {
1411 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1413 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1414 any(ActorSelection.class), eqWriteData(nodeToWrite));
1416 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1418 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1424 public void testWriteThrottlingWhenShardFound(){
1425 throttleOperation(new TransactionProxyOperation() {
1427 public void run(TransactionProxy transactionProxy) {
1428 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1430 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1431 any(ActorSelection.class), eqWriteData(nodeToWrite));
1433 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1435 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1442 public void testWriteThrottlingWhenShardNotFound(){
1443 // Confirm that there is no throttling when the Shard is not found
1444 completeOperation(new TransactionProxyOperation() {
1446 public void run(TransactionProxy transactionProxy) {
1447 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1449 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1450 any(ActorSelection.class), eqWriteData(nodeToWrite));
1452 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1454 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1462 public void testWriteCompletion(){
1463 completeOperation(new TransactionProxyOperation() {
1465 public void run(TransactionProxy transactionProxy) {
1466 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1468 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
1469 any(ActorSelection.class), eqSerializedWriteData(nodeToWrite));
1471 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1473 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1480 public void testMergeThrottlingWhenShardFound(){
1482 throttleOperation(new TransactionProxyOperation() {
1484 public void run(TransactionProxy transactionProxy) {
1485 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1487 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1488 any(ActorSelection.class), eqMergeData(nodeToMerge));
1490 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1492 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1498 public void testMergeThrottlingWhenShardNotFound(){
1500 completeOperation(new TransactionProxyOperation() {
1502 public void run(TransactionProxy transactionProxy) {
1503 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1505 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1506 any(ActorSelection.class), eqMergeData(nodeToMerge));
1508 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1510 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1516 public void testMergeCompletion(){
1517 completeOperation(new TransactionProxyOperation() {
1519 public void run(TransactionProxy transactionProxy) {
1520 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1522 doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
1523 any(ActorSelection.class), eqMergeData(nodeToMerge));
1525 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1527 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1534 public void testDeleteThrottlingWhenShardFound(){
1536 throttleOperation(new TransactionProxyOperation() {
1538 public void run(TransactionProxy transactionProxy) {
1539 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1540 any(ActorSelection.class), eqDeleteData());
1542 transactionProxy.delete(TestModel.TEST_PATH);
1544 transactionProxy.delete(TestModel.TEST_PATH);
1551 public void testDeleteThrottlingWhenShardNotFound(){
1553 completeOperation(new TransactionProxyOperation() {
1555 public void run(TransactionProxy transactionProxy) {
1556 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1557 any(ActorSelection.class), eqDeleteData());
1559 transactionProxy.delete(TestModel.TEST_PATH);
1561 transactionProxy.delete(TestModel.TEST_PATH);
1567 public void testDeleteCompletion(){
1568 completeOperation(new TransactionProxyOperation() {
1570 public void run(TransactionProxy transactionProxy) {
1571 doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
1572 any(ActorSelection.class), eqDeleteData());
1574 transactionProxy.delete(TestModel.TEST_PATH);
1576 transactionProxy.delete(TestModel.TEST_PATH);
1583 public void testReadThrottlingWhenShardFound(){
1585 throttleOperation(new TransactionProxyOperation() {
1587 public void run(TransactionProxy transactionProxy) {
1588 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1589 any(ActorSelection.class), eqReadData());
1591 transactionProxy.read(TestModel.TEST_PATH);
1593 transactionProxy.read(TestModel.TEST_PATH);
1599 public void testReadThrottlingWhenShardNotFound(){
1601 completeOperation(new TransactionProxyOperation() {
1603 public void run(TransactionProxy transactionProxy) {
1604 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1605 any(ActorSelection.class), eqReadData());
1607 transactionProxy.read(TestModel.TEST_PATH);
1609 transactionProxy.read(TestModel.TEST_PATH);
1616 public void testReadCompletion(){
1617 completeOperation(new TransactionProxyOperation() {
1619 public void run(TransactionProxy transactionProxy) {
1620 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1622 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1623 any(ActorSelection.class), eqReadData());
1625 transactionProxy.read(TestModel.TEST_PATH);
1627 transactionProxy.read(TestModel.TEST_PATH);
1634 public void testExistsThrottlingWhenShardFound(){
1636 throttleOperation(new TransactionProxyOperation() {
1638 public void run(TransactionProxy transactionProxy) {
1639 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1640 any(ActorSelection.class), eqDataExists());
1642 transactionProxy.exists(TestModel.TEST_PATH);
1644 transactionProxy.exists(TestModel.TEST_PATH);
1650 public void testExistsThrottlingWhenShardNotFound(){
1652 completeOperation(new TransactionProxyOperation() {
1654 public void run(TransactionProxy transactionProxy) {
1655 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1656 any(ActorSelection.class), eqDataExists());
1658 transactionProxy.exists(TestModel.TEST_PATH);
1660 transactionProxy.exists(TestModel.TEST_PATH);
1667 public void testExistsCompletion(){
1668 completeOperation(new TransactionProxyOperation() {
1670 public void run(TransactionProxy transactionProxy) {
1671 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1672 any(ActorSelection.class), eqDataExists());
1674 transactionProxy.exists(TestModel.TEST_PATH);
1676 transactionProxy.exists(TestModel.TEST_PATH);
1683 public void testReadyThrottling(){
1685 throttleOperation(new TransactionProxyOperation() {
1687 public void run(TransactionProxy transactionProxy) {
1688 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1690 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1691 any(ActorSelection.class), eqWriteData(nodeToWrite));
1693 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1694 any(ActorSelection.class), any(ReadyTransaction.class));
1696 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1698 transactionProxy.ready();
1704 public void testReadyThrottlingWithTwoTransactionContexts(){
1706 throttleOperation(new TransactionProxyOperation() {
1708 public void run(TransactionProxy transactionProxy) {
1709 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1710 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1712 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
1713 any(ActorSelection.class), eqWriteData(nodeToWrite));
1715 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1716 any(ActorSelection.class), eqWriteData(carsNode));
1718 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1719 any(ActorSelection.class), any(ReadyTransaction.class));
1721 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1723 transactionProxy.write(TestModel.TEST_PATH, carsNode);
1725 transactionProxy.ready();