1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.junit.Assert.fail;
7 import static org.mockito.Matchers.any;
8 import static org.mockito.Matchers.anyString;
9 import static org.mockito.Matchers.argThat;
10 import static org.mockito.Matchers.eq;
11 import static org.mockito.Matchers.isA;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
17 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
18 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSelection;
21 import akka.actor.ActorSystem;
22 import akka.actor.Props;
23 import akka.dispatch.Futures;
24 import akka.testkit.JavaTestKit;
25 import com.google.common.base.Optional;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.util.concurrent.CheckedFuture;
28 import com.google.common.util.concurrent.FutureCallback;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.typesafe.config.Config;
31 import com.typesafe.config.ConfigFactory;
32 import java.io.IOException;
33 import java.util.List;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import org.junit.AfterClass;
38 import org.junit.Assert;
39 import org.junit.Before;
40 import org.junit.BeforeClass;
41 import org.junit.Test;
42 import org.mockito.ArgumentMatcher;
43 import org.mockito.Mock;
44 import org.mockito.Mockito;
45 import org.mockito.MockitoAnnotations;
46 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
47 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
49 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
52 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
53 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
54 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
55 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
56 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
62 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
63 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
64 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
65 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
66 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
67 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
68 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
69 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
70 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
71 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
72 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
73 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
74 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
75 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
76 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
77 import scala.concurrent.Await;
78 import scala.concurrent.Future;
79 import scala.concurrent.Promise;
80 import scala.concurrent.duration.Duration;
82 @SuppressWarnings("resource")
83 public class TransactionProxyTest {
85 @SuppressWarnings("serial")
86 static class TestException extends RuntimeException {
89 static interface Invoker {
90 CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
93 private static ActorSystem system;
95 private final Configuration configuration = new MockConfiguration();
98 private ActorContext mockActorContext;
100 private SchemaContext schemaContext;
103 private ClusterWrapper mockClusterWrapper;
105 String memberName = "mock-member";
108 public static void setUpClass() throws IOException {
110 Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder().
111 put("akka.actor.default-dispatcher.type",
112 "akka.testkit.CallingThreadDispatcherConfigurator").build()).
113 withFallback(ConfigFactory.load());
114 system = ActorSystem.create("test", config);
118 public static void tearDownClass() throws IOException {
119 JavaTestKit.shutdownActorSystem(system);
125 MockitoAnnotations.initMocks(this);
127 schemaContext = TestModel.createTestContext();
129 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
131 doReturn(getSystem()).when(mockActorContext).getActorSystem();
132 doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
133 doReturn(memberName).when(mockActorContext).getCurrentMemberName();
134 doReturn(schemaContext).when(mockActorContext).getSchemaContext();
135 doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
136 doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
137 doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
138 doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
140 ShardStrategyFactory.setConfiguration(configuration);
143 private ActorSystem getSystem() {
147 private CreateTransaction eqCreateTransaction(final String memberName,
148 final TransactionType type) {
149 ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
151 public boolean matches(Object argument) {
152 if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
153 CreateTransaction obj = CreateTransaction.fromSerializable(argument);
154 return obj.getTransactionId().startsWith(memberName) &&
155 obj.getTransactionType() == type.ordinal();
162 return argThat(matcher);
165 private DataExists eqSerializedDataExists() {
166 ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
168 public boolean matches(Object argument) {
169 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
170 DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
174 return argThat(matcher);
177 private DataExists eqDataExists() {
178 ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
180 public boolean matches(Object argument) {
181 return (argument instanceof DataExists) &&
182 ((DataExists)argument).getPath().equals(TestModel.TEST_PATH);
186 return argThat(matcher);
189 private ReadData eqSerializedReadData() {
190 ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
192 public boolean matches(Object argument) {
193 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
194 ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
198 return argThat(matcher);
201 private ReadData eqReadData() {
202 ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
204 public boolean matches(Object argument) {
205 return (argument instanceof ReadData) &&
206 ((ReadData)argument).getPath().equals(TestModel.TEST_PATH);
210 return argThat(matcher);
213 private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
214 return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
217 private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite,
218 final int transactionVersion) {
219 ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
221 public boolean matches(Object argument) {
222 if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
223 WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
224 (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
225 ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) {
227 WriteData obj = WriteData.fromSerializable(argument);
228 return obj.getPath().equals(TestModel.TEST_PATH) &&
229 obj.getData().equals(nodeToWrite);
236 return argThat(matcher);
239 private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
240 ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
242 public boolean matches(Object argument) {
243 if(argument instanceof WriteData) {
244 WriteData obj = (WriteData) argument;
245 return obj.getPath().equals(TestModel.TEST_PATH) &&
246 obj.getData().equals(nodeToWrite);
252 return argThat(matcher);
255 private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
256 return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
259 private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite,
260 final int transactionVersion) {
261 ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
263 public boolean matches(Object argument) {
264 if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
265 MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
266 (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
267 ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) {
269 MergeData obj = MergeData.fromSerializable(argument);
270 return obj.getPath().equals(TestModel.TEST_PATH) &&
271 obj.getData().equals(nodeToWrite);
278 return argThat(matcher);
281 private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
282 ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
284 public boolean matches(Object argument) {
285 if(argument instanceof MergeData) {
286 MergeData obj = ((MergeData) argument);
287 return obj.getPath().equals(TestModel.TEST_PATH) &&
288 obj.getData().equals(nodeToWrite);
295 return argThat(matcher);
298 private DeleteData eqSerializedDeleteData() {
299 ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
301 public boolean matches(Object argument) {
302 return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
303 DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
307 return argThat(matcher);
310 private DeleteData eqDeleteData() {
311 ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
313 public boolean matches(Object argument) {
314 return argument instanceof DeleteData &&
315 ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH);
319 return argThat(matcher);
322 private Future<Object> readySerializedTxReply(String path) {
323 return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
326 private Future<Object> readyTxReply(String path) {
327 return Futures.successful((Object)new ReadyTransactionReply(path));
330 private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
331 short transactionVersion) {
332 return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion));
335 private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
336 return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION);
339 private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
340 return Futures.successful(new ReadDataReply(data));
343 private Future<Object> dataExistsSerializedReply(boolean exists) {
344 return Futures.successful(new DataExistsReply(exists).toSerializable());
347 private Future<DataExistsReply> dataExistsReply(boolean exists) {
348 return Futures.successful(new DataExistsReply(exists));
351 private Future<Object> writeSerializedDataReply(short version) {
352 return Futures.successful(new WriteDataReply().toSerializable(version));
355 private Future<Object> writeSerializedDataReply() {
356 return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
359 private Future<WriteDataReply> writeDataReply() {
360 return Futures.successful(new WriteDataReply());
363 private Future<Object> mergeSerializedDataReply(short version) {
364 return Futures.successful(new MergeDataReply().toSerializable(version));
367 private Future<Object> mergeSerializedDataReply() {
368 return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
371 private Future<Object> incompleteFuture(){
372 return mock(Future.class);
375 private Future<MergeDataReply> mergeDataReply() {
376 return Futures.successful(new MergeDataReply());
379 private Future<Object> deleteSerializedDataReply(short version) {
380 return Futures.successful(new DeleteDataReply().toSerializable(version));
383 private Future<Object> deleteSerializedDataReply() {
384 return deleteSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
387 private Future<DeleteDataReply> deleteDataReply() {
388 return Futures.successful(new DeleteDataReply());
391 private ActorSelection actorSelection(ActorRef actorRef) {
392 return getSystem().actorSelection(actorRef.path());
395 private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
396 return CreateTransactionReply.newBuilder()
397 .setTransactionActorPath(actorRef.path().toString())
398 .setTransactionId("txn-1")
399 .setMessageVersion(transactionVersion)
403 private ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
404 ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
405 doReturn(actorSystem.actorSelection(actorRef.path())).
406 when(mockActorContext).actorSelection(actorRef.path().toString());
408 doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
409 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
411 doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
413 doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
418 private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
419 TransactionType type, int transactionVersion) {
420 ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem);
422 doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
423 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
424 eqCreateTransaction(memberName, type));
429 private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
430 return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
434 private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
438 future.checkedGet(5, TimeUnit.SECONDS);
439 fail("Expected ReadFailedException");
440 } catch(ReadFailedException e) {
446 public void testRead() throws Exception {
447 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
449 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
452 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
453 eq(actorSelection(actorRef)), eqSerializedReadData());
455 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
456 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
458 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
460 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
462 doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
463 eq(actorSelection(actorRef)), eqSerializedReadData());
465 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
467 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
469 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
472 @Test(expected = ReadFailedException.class)
473 public void testReadWithInvalidReplyMessageType() throws Exception {
474 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
476 doReturn(Futures.successful(new Object())).when(mockActorContext).
477 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
479 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
482 transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
485 @Test(expected = TestException.class)
486 public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
487 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
489 doReturn(Futures.failed(new TestException())).when(mockActorContext).
490 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
492 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
495 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
498 private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
500 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
502 if (exToThrow instanceof PrimaryNotFoundException) {
503 doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
505 doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
506 when(mockActorContext).findPrimaryShardAsync(anyString());
509 doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
510 any(ActorSelection.class), any());
512 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
514 propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
517 private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
518 testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
520 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
521 return proxy.read(TestModel.TEST_PATH);
526 @Test(expected = PrimaryNotFoundException.class)
527 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
528 testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
531 @Test(expected = TimeoutException.class)
532 public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
533 testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
534 new Exception("reason")));
537 @Test(expected = TestException.class)
538 public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
539 testReadWithExceptionOnInitialCreateTransaction(new TestException());
542 @Test(expected = TestException.class)
543 public void testReadWithPriorRecordingOperationFailure() throws Throwable {
544 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
546 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
548 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
549 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
551 doReturn(Futures.failed(new TestException())).when(mockActorContext).
552 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
554 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
555 eq(actorSelection(actorRef)), eqSerializedReadData());
557 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
560 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
562 transactionProxy.delete(TestModel.TEST_PATH);
565 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
567 verify(mockActorContext, times(0)).executeOperationAsync(
568 eq(actorSelection(actorRef)), eqSerializedReadData());
573 public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
574 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
576 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
578 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
579 eq(actorSelection(actorRef)), eqSerializedWriteData(expectedNode));
581 doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
582 eq(actorSelection(actorRef)), eqSerializedReadData());
584 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
587 transactionProxy.write(TestModel.TEST_PATH, expectedNode);
589 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
590 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
592 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
594 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
597 @Test(expected=IllegalStateException.class)
598 public void testReadPreConditionCheck() {
600 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
603 transactionProxy.read(TestModel.TEST_PATH);
606 @Test(expected=IllegalArgumentException.class)
607 public void testInvalidCreateTransactionReply() throws Throwable {
608 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
610 doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
611 actorSelection(actorRef.path().toString());
613 doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
614 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
616 doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
617 eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
619 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
621 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
625 public void testExists() throws Exception {
626 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
628 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
631 doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
632 eq(actorSelection(actorRef)), eqSerializedDataExists());
634 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
636 assertEquals("Exists response", false, exists);
638 doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
639 eq(actorSelection(actorRef)), eqSerializedDataExists());
641 exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
643 assertEquals("Exists response", true, exists);
646 @Test(expected = PrimaryNotFoundException.class)
647 public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
648 testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
650 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
651 return proxy.exists(TestModel.TEST_PATH);
656 @Test(expected = ReadFailedException.class)
657 public void testExistsWithInvalidReplyMessageType() throws Exception {
658 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
660 doReturn(Futures.successful(new Object())).when(mockActorContext).
661 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
663 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
666 transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
669 @Test(expected = TestException.class)
670 public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
671 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
673 doReturn(Futures.failed(new TestException())).when(mockActorContext).
674 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
676 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
679 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
682 @Test(expected = TestException.class)
683 public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
684 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
686 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
688 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
689 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
691 doReturn(Futures.failed(new TestException())).when(mockActorContext).
692 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
694 doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
695 eq(actorSelection(actorRef)), eqSerializedDataExists());
697 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
700 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
702 transactionProxy.delete(TestModel.TEST_PATH);
705 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
707 verify(mockActorContext, times(0)).executeOperationAsync(
708 eq(actorSelection(actorRef)), eqSerializedDataExists());
713 public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
714 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
716 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
718 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
719 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
721 doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
722 eq(actorSelection(actorRef)), eqSerializedDataExists());
724 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
727 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
729 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
731 assertEquals("Exists response", true, exists);
734 @Test(expected=IllegalStateException.class)
735 public void testExistsPreConditionCheck() {
737 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
740 transactionProxy.exists(TestModel.TEST_PATH);
743 private void verifyRecordingOperationFutures(List<Future<Object>> futures,
744 Class<?>... expResultTypes) throws Exception {
745 assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
748 for( Future<Object> future: futures) {
749 assertNotNull("Recording operation Future is null", future);
751 Class<?> expResultType = expResultTypes[i++];
752 if(Throwable.class.isAssignableFrom(expResultType)) {
754 Await.result(future, Duration.create(5, TimeUnit.SECONDS));
755 fail("Expected exception from recording operation Future");
756 } catch(Exception e) {
760 assertEquals("Recording operation Future result type", expResultType,
761 Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
767 public void testWrite() throws Exception {
768 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
770 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
772 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
773 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
775 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
778 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
780 verify(mockActorContext).executeOperationAsync(
781 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
783 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
784 WriteDataReply.class);
788 public void testWriteAfterAsyncRead() throws Throwable {
789 ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem());
791 Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
792 doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
793 eq(getSystem().actorSelection(actorRef.path())),
794 eqCreateTransaction(memberName, READ_WRITE));
796 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
797 eq(actorSelection(actorRef)), eqSerializedReadData());
799 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
801 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
802 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
804 final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
806 final CountDownLatch readComplete = new CountDownLatch(1);
807 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
808 com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
809 new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
811 public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
813 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
814 } catch (Exception e) {
817 readComplete.countDown();
822 public void onFailure(Throwable t) {
824 readComplete.countDown();
828 createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
830 Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
832 if(caughtEx.get() != null) {
833 throw caughtEx.get();
836 verify(mockActorContext).executeOperationAsync(
837 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
839 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
840 WriteDataReply.class);
843 @Test(expected=IllegalStateException.class)
844 public void testWritePreConditionCheck() {
846 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
849 transactionProxy.write(TestModel.TEST_PATH,
850 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
853 @Test(expected=IllegalStateException.class)
854 public void testWriteAfterReadyPreConditionCheck() {
856 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
859 transactionProxy.ready();
861 transactionProxy.write(TestModel.TEST_PATH,
862 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
866 public void testMerge() throws Exception {
867 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
869 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
871 doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
872 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
874 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
876 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
878 verify(mockActorContext).executeOperationAsync(
879 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
881 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
882 MergeDataReply.class);
886 public void testDelete() throws Exception {
887 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
889 doReturn(deleteSerializedDataReply()).when(mockActorContext).executeOperationAsync(
890 eq(actorSelection(actorRef)), eqSerializedDeleteData());
892 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
895 transactionProxy.delete(TestModel.TEST_PATH);
897 verify(mockActorContext).executeOperationAsync(
898 eq(actorSelection(actorRef)), eqSerializedDeleteData());
900 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
901 DeleteDataReply.class);
904 private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
905 Object... expReplies) throws Exception {
906 assertEquals("getReadyOperationFutures size", expReplies.length,
907 proxy.getCohortFutures().size());
910 for( Future<ActorSelection> future: proxy.getCohortFutures()) {
911 assertNotNull("Ready operation Future is null", future);
913 Object expReply = expReplies[i++];
914 if(expReply instanceof ActorSelection) {
915 ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
916 assertEquals("Cohort actor path", expReply, actual);
918 // Expecting exception.
920 Await.result(future, Duration.create(5, TimeUnit.SECONDS));
921 fail("Expected exception from ready operation Future");
922 } catch(Exception e) {
930 public void testReady() throws Exception {
931 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
933 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
935 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
936 eq(actorSelection(actorRef)), eqSerializedReadData());
938 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
939 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
941 doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
942 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
944 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
947 transactionProxy.read(TestModel.TEST_PATH);
949 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
951 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
953 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
955 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
957 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
958 WriteDataReply.class);
960 verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
963 private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
964 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(),
965 READ_WRITE, version);
967 NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
969 doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
970 eq(actorSelection(actorRef)), eqSerializedReadData());
972 doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
973 eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version));
975 doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
976 eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version));
978 doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
979 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
981 doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
982 eq(actorRef.path().toString()));
984 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
986 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
987 get(5, TimeUnit.SECONDS);
989 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
990 assertEquals("Response NormalizedNode", testNode, readOptional.get());
992 transactionProxy.write(TestModel.TEST_PATH, testNode);
994 transactionProxy.merge(TestModel.TEST_PATH, testNode);
996 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
998 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1000 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1002 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1003 ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class);
1005 verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
1011 public void testCompatibilityWithBaseHeliumVersion() throws Exception {
1012 ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
1014 verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
1015 eq(actorRef.path().toString()));
1019 public void testCompatibilityWithHeliumR1Version() throws Exception {
1020 ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION);
1022 verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
1023 eq(actorRef.path().toString()));
1027 public void testReadyWithRecordingOperationFailure() throws Exception {
1028 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
1030 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1032 doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
1033 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
1035 doReturn(Futures.failed(new TestException())).when(mockActorContext).
1036 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
1038 doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
1039 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
1041 doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
1043 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1046 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1048 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1050 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1052 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1054 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1056 verifyCohortFutures(proxy, TestException.class);
1058 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1059 MergeDataReply.class, TestException.class);
1063 public void testReadyWithReplyFailure() throws Exception {
1064 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
1066 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1068 doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
1069 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
1071 doReturn(Futures.failed(new TestException())).when(mockActorContext).
1072 executeOperationAsync(eq(actorSelection(actorRef)),
1073 isA(ReadyTransaction.SERIALIZABLE_CLASS));
1075 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1078 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1080 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1082 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1084 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1086 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1087 MergeDataReply.class);
1089 verifyCohortFutures(proxy, TestException.class);
1093 public void testReadyWithInitialCreateTransactionFailure() throws Exception {
1095 doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
1096 mockActorContext).findPrimaryShardAsync(anyString());
1098 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1101 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1103 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1105 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1107 transactionProxy.delete(TestModel.TEST_PATH);
1109 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1111 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1113 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1115 verifyCohortFutures(proxy, PrimaryNotFoundException.class);
1119 public void testReadyWithInvalidReplyMessageType() throws Exception {
1120 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
1122 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1124 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
1125 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
1127 doReturn(Futures.successful(new Object())).when(mockActorContext).
1128 executeOperationAsync(eq(actorSelection(actorRef)),
1129 isA(ReadyTransaction.SERIALIZABLE_CLASS));
1131 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1134 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1136 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1138 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1140 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1142 verifyCohortFutures(proxy, IllegalArgumentException.class);
1146 public void testGetIdentifier() {
1147 setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
1148 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1149 TransactionProxy.TransactionType.READ_ONLY);
1151 Object id = transactionProxy.getIdentifier();
1152 assertNotNull("getIdentifier returned null", id);
1153 assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
1157 public void testClose() throws Exception{
1158 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1160 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
1161 eq(actorSelection(actorRef)), eqSerializedReadData());
1163 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1166 transactionProxy.read(TestModel.TEST_PATH);
1168 transactionProxy.close();
1170 verify(mockActorContext).sendOperationAsync(
1171 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
1176 * Method to test a local Tx actor. The Tx paths are matched to decide if the
1177 * Tx actor is local or not. This is done by mocking the Tx actor path
1178 * and the caller paths and ensuring that the paths have the remote-address format
1180 * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
1181 * the paths returned for the actors for all the tests are not qualified remote paths.
1182 * Hence are treated as non-local/remote actors. In short, all tests except
1183 * few below run for remote actors
1188 public void testLocalTxActorRead() throws Exception {
1189 ActorSystem actorSystem = getSystem();
1190 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1192 doReturn(actorSystem.actorSelection(shardActorRef.path())).
1193 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1195 doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1196 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1198 String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1199 CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1200 .setTransactionId("txn-1")
1201 .setTransactionActorPath(actorPath)
1204 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1205 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1206 eqCreateTransaction(memberName, READ_ONLY));
1208 doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1210 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
1212 // negative test case with null as the reply
1213 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
1214 any(ActorSelection.class), eqReadData());
1216 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1217 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1219 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
1221 // test case with node as read data reply
1222 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1224 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1225 any(ActorSelection.class), eqReadData());
1227 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1229 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1231 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
1233 // test for local data exists
1234 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1235 any(ActorSelection.class), eqDataExists());
1237 boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1239 assertEquals("Exists response", true, exists);
1243 public void testLocalTxActorWrite() throws Exception {
1244 ActorSystem actorSystem = getSystem();
1245 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1247 doReturn(actorSystem.actorSelection(shardActorRef.path())).
1248 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1250 doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1251 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1253 String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1254 CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1255 .setTransactionId("txn-1")
1256 .setTransactionActorPath(actorPath)
1259 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1260 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1261 eqCreateTransaction(memberName, WRITE_ONLY));
1263 doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1265 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1267 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
1268 any(ActorSelection.class), eqWriteData(nodeToWrite));
1270 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
1271 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1273 verify(mockActorContext).executeOperationAsync(
1274 any(ActorSelection.class), eqWriteData(nodeToWrite));
1276 //testing local merge
1277 doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
1278 any(ActorSelection.class), eqMergeData(nodeToWrite));
1280 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1282 verify(mockActorContext).executeOperationAsync(
1283 any(ActorSelection.class), eqMergeData(nodeToWrite));
1286 //testing local delete
1287 doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
1288 any(ActorSelection.class), eqDeleteData());
1290 transactionProxy.delete(TestModel.TEST_PATH);
1292 verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData());
1294 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1295 WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class);
1298 doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
1299 any(ActorSelection.class), isA(ReadyTransaction.class));
1301 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1303 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1305 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1307 verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
1310 private static interface TransactionProxyOperation {
1311 void run(TransactionProxy transactionProxy);
1314 private void throttleOperation(TransactionProxyOperation operation) {
1315 throttleOperation(operation, 1, true);
1318 private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
1319 ActorSystem actorSystem = getSystem();
1320 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1322 doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
1324 doReturn(actorSystem.actorSelection(shardActorRef.path())).
1325 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1328 doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1329 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1331 doReturn(Futures.failed(new Exception("not found")))
1332 .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1335 String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1336 CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1337 .setTransactionId("txn-1")
1338 .setTransactionActorPath(actorPath)
1341 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1342 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1343 eqCreateTransaction(memberName, READ_WRITE));
1345 doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1347 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1349 long start = System.nanoTime();
1351 operation.run(transactionProxy);
1353 long end = System.nanoTime();
1355 Assert.assertTrue(String.format("took less time than expected %s was %s",
1356 TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()),
1357 (end-start)), (end - start) > TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
1361 private void completeOperation(TransactionProxyOperation operation){
1362 completeOperation(operation, true);
1365 private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
1366 ActorSystem actorSystem = getSystem();
1367 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1369 doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
1371 doReturn(actorSystem.actorSelection(shardActorRef.path())).
1372 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1375 doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1376 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1378 doReturn(Futures.failed(new Exception("not found")))
1379 .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1382 String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1383 CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1384 .setTransactionId("txn-1")
1385 .setTransactionActorPath(actorPath)
1388 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1389 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1390 eqCreateTransaction(memberName, READ_WRITE));
1392 doReturn(true).when(mockActorContext).isPathLocal(actorPath);
1394 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1396 long start = System.nanoTime();
1398 operation.run(transactionProxy);
1400 long end = System.nanoTime();
1402 Assert.assertTrue(String.format("took more time than expected %s was %s",
1403 TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()),
1404 (end-start)), (end - start) <= TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
1407 public void testWriteThrottling(boolean shardFound){
1409 throttleOperation(new TransactionProxyOperation() {
1411 public void run(TransactionProxy transactionProxy) {
1412 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1414 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1415 any(ActorSelection.class), eqWriteData(nodeToWrite));
1417 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1419 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1425 public void testWriteThrottlingWhenShardFound(){
1426 throttleOperation(new TransactionProxyOperation() {
1428 public void run(TransactionProxy transactionProxy) {
1429 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1431 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1432 any(ActorSelection.class), eqWriteData(nodeToWrite));
1434 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1436 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1443 public void testWriteThrottlingWhenShardNotFound(){
1444 // Confirm that there is no throttling when the Shard is not found
1445 completeOperation(new TransactionProxyOperation() {
1447 public void run(TransactionProxy transactionProxy) {
1448 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1450 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1451 any(ActorSelection.class), eqWriteData(nodeToWrite));
1453 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1455 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1463 public void testWriteCompletion(){
1464 completeOperation(new TransactionProxyOperation() {
1466 public void run(TransactionProxy transactionProxy) {
1467 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1469 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
1470 any(ActorSelection.class), eqSerializedWriteData(nodeToWrite));
1472 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1474 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1481 public void testMergeThrottlingWhenShardFound(){
1483 throttleOperation(new TransactionProxyOperation() {
1485 public void run(TransactionProxy transactionProxy) {
1486 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1488 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1489 any(ActorSelection.class), eqMergeData(nodeToMerge));
1491 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1493 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1499 public void testMergeThrottlingWhenShardNotFound(){
1501 completeOperation(new TransactionProxyOperation() {
1503 public void run(TransactionProxy transactionProxy) {
1504 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1506 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1507 any(ActorSelection.class), eqMergeData(nodeToMerge));
1509 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1511 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1517 public void testMergeCompletion(){
1518 completeOperation(new TransactionProxyOperation() {
1520 public void run(TransactionProxy transactionProxy) {
1521 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1523 doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
1524 any(ActorSelection.class), eqMergeData(nodeToMerge));
1526 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1528 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1535 public void testDeleteThrottlingWhenShardFound(){
1537 throttleOperation(new TransactionProxyOperation() {
1539 public void run(TransactionProxy transactionProxy) {
1540 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1541 any(ActorSelection.class), eqDeleteData());
1543 transactionProxy.delete(TestModel.TEST_PATH);
1545 transactionProxy.delete(TestModel.TEST_PATH);
1552 public void testDeleteThrottlingWhenShardNotFound(){
1554 completeOperation(new TransactionProxyOperation() {
1556 public void run(TransactionProxy transactionProxy) {
1557 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1558 any(ActorSelection.class), eqDeleteData());
1560 transactionProxy.delete(TestModel.TEST_PATH);
1562 transactionProxy.delete(TestModel.TEST_PATH);
1568 public void testDeleteCompletion(){
1569 completeOperation(new TransactionProxyOperation() {
1571 public void run(TransactionProxy transactionProxy) {
1572 doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
1573 any(ActorSelection.class), eqDeleteData());
1575 transactionProxy.delete(TestModel.TEST_PATH);
1577 transactionProxy.delete(TestModel.TEST_PATH);
1584 public void testReadThrottlingWhenShardFound(){
1586 throttleOperation(new TransactionProxyOperation() {
1588 public void run(TransactionProxy transactionProxy) {
1589 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1590 any(ActorSelection.class), eqReadData());
1592 transactionProxy.read(TestModel.TEST_PATH);
1594 transactionProxy.read(TestModel.TEST_PATH);
1600 public void testReadThrottlingWhenShardNotFound(){
1602 completeOperation(new TransactionProxyOperation() {
1604 public void run(TransactionProxy transactionProxy) {
1605 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1606 any(ActorSelection.class), eqReadData());
1608 transactionProxy.read(TestModel.TEST_PATH);
1610 transactionProxy.read(TestModel.TEST_PATH);
1617 public void testReadCompletion(){
1618 completeOperation(new TransactionProxyOperation() {
1620 public void run(TransactionProxy transactionProxy) {
1621 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1623 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1624 any(ActorSelection.class), eqReadData());
1626 transactionProxy.read(TestModel.TEST_PATH);
1628 transactionProxy.read(TestModel.TEST_PATH);
1635 public void testExistsThrottlingWhenShardFound(){
1637 throttleOperation(new TransactionProxyOperation() {
1639 public void run(TransactionProxy transactionProxy) {
1640 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1641 any(ActorSelection.class), eqDataExists());
1643 transactionProxy.exists(TestModel.TEST_PATH);
1645 transactionProxy.exists(TestModel.TEST_PATH);
1651 public void testExistsThrottlingWhenShardNotFound(){
1653 completeOperation(new TransactionProxyOperation() {
1655 public void run(TransactionProxy transactionProxy) {
1656 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1657 any(ActorSelection.class), eqDataExists());
1659 transactionProxy.exists(TestModel.TEST_PATH);
1661 transactionProxy.exists(TestModel.TEST_PATH);
1668 public void testExistsCompletion(){
1669 completeOperation(new TransactionProxyOperation() {
1671 public void run(TransactionProxy transactionProxy) {
1672 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1673 any(ActorSelection.class), eqDataExists());
1675 transactionProxy.exists(TestModel.TEST_PATH);
1677 transactionProxy.exists(TestModel.TEST_PATH);
1684 public void testReadyThrottling(){
1686 throttleOperation(new TransactionProxyOperation() {
1688 public void run(TransactionProxy transactionProxy) {
1689 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1691 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1692 any(ActorSelection.class), eqWriteData(nodeToWrite));
1694 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1695 any(ActorSelection.class), any(ReadyTransaction.class));
1697 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1699 transactionProxy.ready();
1705 public void testReadyThrottlingWithTwoTransactionContexts(){
1707 throttleOperation(new TransactionProxyOperation() {
1709 public void run(TransactionProxy transactionProxy) {
1710 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1711 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1713 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
1714 any(ActorSelection.class), eqWriteData(nodeToWrite));
1716 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1717 any(ActorSelection.class), eqWriteData(carsNode));
1719 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1720 any(ActorSelection.class), any(ReadyTransaction.class));
1722 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1724 transactionProxy.write(TestModel.TEST_PATH, carsNode);
1726 transactionProxy.ready();