1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSelection;
5 import akka.actor.ActorSystem;
6 import akka.actor.Props;
7 import akka.dispatch.Futures;
8 import akka.testkit.JavaTestKit;
9 import com.google.common.base.Optional;
10 import com.google.common.collect.ImmutableMap;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.typesafe.config.Config;
13 import com.typesafe.config.ConfigFactory;
14 import org.junit.AfterClass;
15 import org.junit.Before;
16 import org.junit.BeforeClass;
17 import org.junit.Test;
18 import org.mockito.ArgumentMatcher;
19 import org.mockito.Mock;
20 import org.mockito.MockitoAnnotations;
21 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
22 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
23 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
24 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
27 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
28 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
29 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
30 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
31 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
37 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
38 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
39 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
40 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
41 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
42 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
43 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
44 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
45 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
46 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
49 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
50 import scala.concurrent.Await;
51 import scala.concurrent.Future;
52 import scala.concurrent.duration.Duration;
53 import java.io.IOException;
54 import java.util.List;
55 import java.util.concurrent.TimeUnit;
56 import static org.junit.Assert.assertEquals;
57 import static org.junit.Assert.assertNotNull;
58 import static org.junit.Assert.assertTrue;
59 import static org.junit.Assert.fail;
60 import static org.mockito.Matchers.any;
61 import static org.mockito.Matchers.anyString;
62 import static org.mockito.Mockito.argThat;
63 import static org.mockito.Mockito.doReturn;
64 import static org.mockito.Mockito.eq;
65 import static org.mockito.Mockito.isA;
66 import static org.mockito.Mockito.times;
67 import static org.mockito.Mockito.verify;
68 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
69 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
70 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
72 @SuppressWarnings("resource")
73 public class TransactionProxyTest {
75 @SuppressWarnings("serial")
76 static class TestException extends RuntimeException {
79 static interface Invoker {
80 CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
83 private static ActorSystem system;
85 private final Configuration configuration = new MockConfiguration();
88 private ActorContext mockActorContext;
90 private SchemaContext schemaContext;
93 private ClusterWrapper mockClusterWrapper;
95 String memberName = "mock-member";
98 public static void setUpClass() throws IOException {
100 Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder().
101 put("akka.actor.default-dispatcher.type",
102 "akka.testkit.CallingThreadDispatcherConfigurator").build()).
103 withFallback(ConfigFactory.load());
104 system = ActorSystem.create("test", config);
108 public static void tearDownClass() throws IOException {
109 JavaTestKit.shutdownActorSystem(system);
115 MockitoAnnotations.initMocks(this);
117 schemaContext = TestModel.createTestContext();
119 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().build();
121 doReturn(getSystem()).when(mockActorContext).getActorSystem();
122 doReturn(memberName).when(mockActorContext).getCurrentMemberName();
123 doReturn(schemaContext).when(mockActorContext).getSchemaContext();
124 doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
125 doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
126 doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
128 ShardStrategyFactory.setConfiguration(configuration);
131 private ActorSystem getSystem() {
135 private CreateTransaction eqCreateTransaction(final String memberName,
136 final TransactionType type) {
137 ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
139 public boolean matches(Object argument) {
140 CreateTransaction obj = CreateTransaction.fromSerializable(argument);
141 return obj.getTransactionId().startsWith(memberName) &&
142 obj.getTransactionType() == type.ordinal();
146 return argThat(matcher);
149 private DataExists eqSerializedDataExists() {
150 ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
152 public boolean matches(Object argument) {
153 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
154 DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
158 return argThat(matcher);
161 private DataExists eqDataExists() {
162 ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
164 public boolean matches(Object argument) {
165 return (argument instanceof DataExists) &&
166 ((DataExists)argument).getPath().equals(TestModel.TEST_PATH);
170 return argThat(matcher);
173 private ReadData eqSerializedReadData() {
174 ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
176 public boolean matches(Object argument) {
177 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
178 ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
182 return argThat(matcher);
185 private ReadData eqReadData() {
186 ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
188 public boolean matches(Object argument) {
189 return (argument instanceof ReadData) &&
190 ((ReadData)argument).getPath().equals(TestModel.TEST_PATH);
194 return argThat(matcher);
197 private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
198 ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
200 public boolean matches(Object argument) {
201 if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
205 WriteData obj = WriteData.fromSerializable(argument, schemaContext);
206 return obj.getPath().equals(TestModel.TEST_PATH) &&
207 obj.getData().equals(nodeToWrite);
211 return argThat(matcher);
214 private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
215 ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
217 public boolean matches(Object argument) {
218 if(argument instanceof WriteData) {
219 WriteData obj = (WriteData) argument;
220 return obj.getPath().equals(TestModel.TEST_PATH) &&
221 obj.getData().equals(nodeToWrite);
227 return argThat(matcher);
230 private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
231 ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
233 public boolean matches(Object argument) {
234 if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
238 MergeData obj = MergeData.fromSerializable(argument, schemaContext);
239 return obj.getPath().equals(TestModel.TEST_PATH) &&
240 obj.getData().equals(nodeToWrite);
244 return argThat(matcher);
247 private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
248 ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
250 public boolean matches(Object argument) {
251 if(argument instanceof MergeData) {
252 MergeData obj = ((MergeData) argument);
253 return obj.getPath().equals(TestModel.TEST_PATH) &&
254 obj.getData().equals(nodeToWrite);
261 return argThat(matcher);
264 private DeleteData eqSerializedDeleteData() {
265 ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
267 public boolean matches(Object argument) {
268 return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
269 DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
273 return argThat(matcher);
276 private DeleteData eqDeleteData() {
277 ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
279 public boolean matches(Object argument) {
280 return argument instanceof DeleteData &&
281 ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH);
285 return argThat(matcher);
288 private Future<Object> readySerializedTxReply(String path) {
289 return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
292 private Future<Object> readyTxReply(String path) {
293 return Futures.successful((Object)new ReadyTransactionReply(path));
297 private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
298 return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
301 private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
302 return Futures.successful(new ReadDataReply(schemaContext, data));
305 private Future<Object> dataExistsSerializedReply(boolean exists) {
306 return Futures.successful(new DataExistsReply(exists).toSerializable());
309 private Future<DataExistsReply> dataExistsReply(boolean exists) {
310 return Futures.successful(new DataExistsReply(exists));
313 private Future<Object> writeSerializedDataReply() {
314 return Futures.successful(new WriteDataReply().toSerializable());
317 private Future<WriteDataReply> writeDataReply() {
318 return Futures.successful(new WriteDataReply());
321 private Future<Object> mergeSerializedDataReply() {
322 return Futures.successful(new MergeDataReply().toSerializable());
325 private Future<MergeDataReply> mergeDataReply() {
326 return Futures.successful(new MergeDataReply());
329 private Future<Object> deleteSerializedDataReply() {
330 return Futures.successful(new DeleteDataReply().toSerializable());
333 private Future<DeleteDataReply> deleteDataReply() {
334 return Futures.successful(new DeleteDataReply());
337 private ActorSelection actorSelection(ActorRef actorRef) {
338 return getSystem().actorSelection(actorRef.path());
341 private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
342 return CreateTransactionReply.newBuilder()
343 .setTransactionActorPath(actorRef.path().toString())
344 .setTransactionId("txn-1")
345 .setMessageVersion(transactionVersion)
349 private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) {
350 ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
351 doReturn(actorSystem.actorSelection(actorRef.path())).
352 when(mockActorContext).actorSelection(actorRef.path().toString());
354 doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
355 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
357 doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
358 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
359 eqCreateTransaction(memberName, type));
361 doReturn(false).when(mockActorContext).isLocalPath(actorRef.path().toString());
366 private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
367 return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION);
371 private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
375 future.checkedGet(5, TimeUnit.SECONDS);
376 fail("Expected ReadFailedException");
377 } catch(ReadFailedException e) {
383 public void testRead() throws Exception {
384 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
386 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
389 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
390 eq(actorSelection(actorRef)), eqSerializedReadData());
392 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
393 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
395 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
397 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
399 doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
400 eq(actorSelection(actorRef)), eqSerializedReadData());
402 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
404 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
406 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
409 @Test(expected = ReadFailedException.class)
410 public void testReadWithInvalidReplyMessageType() throws Exception {
411 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
413 doReturn(Futures.successful(new Object())).when(mockActorContext).
414 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
416 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
419 transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
422 @Test(expected = TestException.class)
423 public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
424 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
426 doReturn(Futures.failed(new TestException())).when(mockActorContext).
427 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
429 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
432 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
435 private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
437 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
439 if (exToThrow instanceof PrimaryNotFoundException) {
440 doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
442 doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
443 when(mockActorContext).findPrimaryShardAsync(anyString());
446 doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
447 any(ActorSelection.class), any());
449 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
451 propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
454 private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
455 testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
457 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
458 return proxy.read(TestModel.TEST_PATH);
463 @Test(expected = PrimaryNotFoundException.class)
464 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
465 testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
468 @Test(expected = TimeoutException.class)
469 public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
470 testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
471 new Exception("reason")));
474 @Test(expected = TestException.class)
475 public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
476 testReadWithExceptionOnInitialCreateTransaction(new TestException());
479 @Test(expected = TestException.class)
480 public void testReadWithPriorRecordingOperationFailure() throws Throwable {
481 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
483 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
485 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
486 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
488 doReturn(Futures.failed(new TestException())).when(mockActorContext).
489 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
491 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
492 eq(actorSelection(actorRef)), eqSerializedReadData());
494 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
497 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
499 transactionProxy.delete(TestModel.TEST_PATH);
502 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
504 verify(mockActorContext, times(0)).executeOperationAsync(
505 eq(actorSelection(actorRef)), eqSerializedReadData());
510 public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
511 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
513 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
515 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
516 eq(actorSelection(actorRef)), eqSerializedWriteData(expectedNode));
518 doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
519 eq(actorSelection(actorRef)), eqSerializedReadData());
521 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
524 transactionProxy.write(TestModel.TEST_PATH, expectedNode);
526 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
527 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
529 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
531 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
534 @Test(expected=IllegalStateException.class)
535 public void testReadPreConditionCheck() {
537 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
540 transactionProxy.read(TestModel.TEST_PATH);
543 @Test(expected=IllegalArgumentException.class)
544 public void testInvalidCreateTransactionReply() throws Throwable {
545 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
547 doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
548 actorSelection(actorRef.path().toString());
550 doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
551 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
553 doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
554 eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
556 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
558 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
562 public void testExists() throws Exception {
563 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
565 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
568 doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
569 eq(actorSelection(actorRef)), eqSerializedDataExists());
571 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
573 assertEquals("Exists response", false, exists);
575 doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
576 eq(actorSelection(actorRef)), eqSerializedDataExists());
578 exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
580 assertEquals("Exists response", true, exists);
583 @Test(expected = PrimaryNotFoundException.class)
584 public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
585 testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
587 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
588 return proxy.exists(TestModel.TEST_PATH);
593 @Test(expected = ReadFailedException.class)
594 public void testExistsWithInvalidReplyMessageType() throws Exception {
595 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
597 doReturn(Futures.successful(new Object())).when(mockActorContext).
598 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
600 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
603 transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
606 @Test(expected = TestException.class)
607 public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
608 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
610 doReturn(Futures.failed(new TestException())).when(mockActorContext).
611 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
613 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
616 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
619 @Test(expected = TestException.class)
620 public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
621 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
623 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
625 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
626 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
628 doReturn(Futures.failed(new TestException())).when(mockActorContext).
629 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
631 doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
632 eq(actorSelection(actorRef)), eqSerializedDataExists());
634 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
637 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
639 transactionProxy.delete(TestModel.TEST_PATH);
642 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
644 verify(mockActorContext, times(0)).executeOperationAsync(
645 eq(actorSelection(actorRef)), eqSerializedDataExists());
650 public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
651 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
653 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
655 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
656 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
658 doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
659 eq(actorSelection(actorRef)), eqSerializedDataExists());
661 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
664 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
666 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
668 assertEquals("Exists response", true, exists);
671 @Test(expected=IllegalStateException.class)
672 public void testxistsPreConditionCheck() {
674 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
677 transactionProxy.exists(TestModel.TEST_PATH);
680 private void verifyRecordingOperationFutures(List<Future<Object>> futures,
681 Class<?>... expResultTypes) throws Exception {
682 assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
685 for( Future<Object> future: futures) {
686 assertNotNull("Recording operation Future is null", future);
688 Class<?> expResultType = expResultTypes[i++];
689 if(Throwable.class.isAssignableFrom(expResultType)) {
691 Await.result(future, Duration.create(5, TimeUnit.SECONDS));
692 fail("Expected exception from recording operation Future");
693 } catch(Exception e) {
697 assertEquals("Recording operation Future result type", expResultType,
698 Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
704 public void testWrite() throws Exception {
705 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
707 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
709 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
710 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
712 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
715 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
717 verify(mockActorContext).executeOperationAsync(
718 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
720 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
721 WriteDataReply.SERIALIZABLE_CLASS);
724 @Test(expected=IllegalStateException.class)
725 public void testWritePreConditionCheck() {
727 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
730 transactionProxy.write(TestModel.TEST_PATH,
731 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
734 @Test(expected=IllegalStateException.class)
735 public void testWriteAfterReadyPreConditionCheck() {
737 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
740 transactionProxy.ready();
742 transactionProxy.write(TestModel.TEST_PATH,
743 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
747 public void testMerge() throws Exception {
748 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
750 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
752 doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
753 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
755 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
757 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
759 verify(mockActorContext).executeOperationAsync(
760 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
762 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
763 MergeDataReply.SERIALIZABLE_CLASS);
767 public void testDelete() throws Exception {
768 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
770 doReturn(deleteSerializedDataReply()).when(mockActorContext).executeOperationAsync(
771 eq(actorSelection(actorRef)), eqSerializedDeleteData());
773 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
776 transactionProxy.delete(TestModel.TEST_PATH);
778 verify(mockActorContext).executeOperationAsync(
779 eq(actorSelection(actorRef)), eqSerializedDeleteData());
781 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
782 DeleteDataReply.SERIALIZABLE_CLASS);
785 private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
786 Object... expReplies) throws Exception {
787 assertEquals("getReadyOperationFutures size", expReplies.length,
788 proxy.getCohortFutures().size());
791 for( Future<ActorSelection> future: proxy.getCohortFutures()) {
792 assertNotNull("Ready operation Future is null", future);
794 Object expReply = expReplies[i++];
795 if(expReply instanceof ActorSelection) {
796 ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
797 assertEquals("Cohort actor path", expReply, actual);
799 // Expecting exception.
801 Await.result(future, Duration.create(5, TimeUnit.SECONDS));
802 fail("Expected exception from ready operation Future");
803 } catch(Exception e) {
810 @SuppressWarnings("unchecked")
812 public void testReady() throws Exception {
813 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
815 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
817 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
818 eq(actorSelection(actorRef)), eqSerializedReadData());
820 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
821 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
823 doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
824 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
826 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
829 transactionProxy.read(TestModel.TEST_PATH);
831 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
833 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
835 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
837 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
839 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
840 WriteDataReply.SERIALIZABLE_CLASS);
842 verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
845 @SuppressWarnings("unchecked")
847 public void testReadyForwardCompatibility() throws Exception {
848 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, 0);
850 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
852 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
853 eq(actorSelection(actorRef)), eqSerializedReadData());
855 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
856 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
858 doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
859 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
861 doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
862 eq(actorRef.path().toString()));
864 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
867 transactionProxy.read(TestModel.TEST_PATH);
869 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
871 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
873 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
875 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
877 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
878 WriteDataReply.SERIALIZABLE_CLASS);
880 verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
882 verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
883 eq(actorRef.path().toString()));
886 @SuppressWarnings("unchecked")
888 public void testReadyWithRecordingOperationFailure() throws Exception {
889 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
891 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
893 doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
894 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
896 doReturn(Futures.failed(new TestException())).when(mockActorContext).
897 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
899 doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
900 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
902 doReturn(false).when(mockActorContext).isLocalPath(actorRef.path().toString());
904 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
907 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
909 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
911 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
913 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
915 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
917 verifyCohortFutures(proxy, TestException.class);
919 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
920 MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
923 @SuppressWarnings("unchecked")
925 public void testReadyWithReplyFailure() throws Exception {
926 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
928 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
930 doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
931 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
933 doReturn(Futures.failed(new TestException())).when(mockActorContext).
934 executeOperationAsync(eq(actorSelection(actorRef)),
935 isA(ReadyTransaction.SERIALIZABLE_CLASS));
937 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
940 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
942 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
944 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
946 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
948 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
949 MergeDataReply.SERIALIZABLE_CLASS);
951 verifyCohortFutures(proxy, TestException.class);
955 public void testReadyWithInitialCreateTransactionFailure() throws Exception {
957 doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
958 mockActorContext).findPrimaryShardAsync(anyString());
960 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
963 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
965 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
967 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
969 transactionProxy.delete(TestModel.TEST_PATH);
971 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
973 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
975 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
977 verifyCohortFutures(proxy, PrimaryNotFoundException.class);
980 @SuppressWarnings("unchecked")
982 public void testReadyWithInvalidReplyMessageType() throws Exception {
983 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
985 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
987 doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
988 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
990 doReturn(Futures.successful(new Object())).when(mockActorContext).
991 executeOperationAsync(eq(actorSelection(actorRef)),
992 isA(ReadyTransaction.SERIALIZABLE_CLASS));
994 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
997 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
999 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1001 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1003 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1005 verifyCohortFutures(proxy, IllegalArgumentException.class);
1009 public void testGetIdentifier() {
1010 setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
1011 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1012 TransactionProxy.TransactionType.READ_ONLY);
1014 Object id = transactionProxy.getIdentifier();
1015 assertNotNull("getIdentifier returned null", id);
1016 assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
1019 @SuppressWarnings("unchecked")
1021 public void testClose() throws Exception{
1022 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1024 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
1025 eq(actorSelection(actorRef)), eqSerializedReadData());
1027 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
1030 transactionProxy.read(TestModel.TEST_PATH);
1032 transactionProxy.close();
1034 verify(mockActorContext).sendOperationAsync(
1035 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
1040 * Method to test a local Tx actor. The Tx paths are matched to decide if the
1041 * Tx actor is local or not. This is done by mocking the Tx actor path
1042 * and the caller paths and ensuring that the paths have the remote-address format
1044 * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
1045 * the paths returned for the actors for all the tests are not qualified remote paths.
1046 * Hence are treated as non-local/remote actors. In short, all tests except
1047 * few below run for remote actors
1052 public void testLocalTxActorRead() throws Exception {
1053 ActorSystem actorSystem = getSystem();
1054 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1056 doReturn(actorSystem.actorSelection(shardActorRef.path())).
1057 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1059 doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1060 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1062 String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1063 CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1064 .setTransactionId("txn-1")
1065 .setTransactionActorPath(actorPath)
1068 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1069 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1070 eqCreateTransaction(memberName, READ_ONLY));
1072 doReturn(true).when(mockActorContext).isLocalPath(actorPath);
1074 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
1076 // negative test case with null as the reply
1077 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
1078 any(ActorSelection.class), eqReadData());
1080 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1081 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1083 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
1085 // test case with node as read data reply
1086 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1088 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1089 any(ActorSelection.class), eqReadData());
1091 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
1093 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1095 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
1097 // test for local data exists
1098 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1099 any(ActorSelection.class), eqDataExists());
1101 boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1103 assertEquals("Exists response", true, exists);
1107 public void testLocalTxActorWrite() throws Exception {
1108 ActorSystem actorSystem = getSystem();
1109 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1111 doReturn(actorSystem.actorSelection(shardActorRef.path())).
1112 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1114 doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
1115 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
1117 String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
1118 CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
1119 .setTransactionId("txn-1")
1120 .setTransactionActorPath(actorPath)
1123 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
1124 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1125 eqCreateTransaction(memberName, WRITE_ONLY));
1127 doReturn(true).when(mockActorContext).isLocalPath(actorPath);
1129 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1131 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
1132 any(ActorSelection.class), eqWriteData(nodeToWrite));
1134 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
1135 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1137 verify(mockActorContext).executeOperationAsync(
1138 any(ActorSelection.class), eqWriteData(nodeToWrite));
1140 //testing local merge
1141 doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
1142 any(ActorSelection.class), eqMergeData(nodeToWrite));
1144 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1146 verify(mockActorContext).executeOperationAsync(
1147 any(ActorSelection.class), eqMergeData(nodeToWrite));
1150 //testing local delete
1151 doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
1152 any(ActorSelection.class), eqDeleteData());
1154 transactionProxy.delete(TestModel.TEST_PATH);
1156 verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData());
1158 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1159 WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class);
1162 doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
1163 any(ActorSelection.class), isA(ReadyTransaction.class));
1165 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
1167 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
1169 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
1171 verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));