1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorPath;
4 import akka.actor.ActorRef;
5 import akka.actor.ActorSelection;
6 import akka.actor.Props;
7 import akka.dispatch.Futures;
8 import com.google.common.base.Optional;
9 import com.google.common.util.concurrent.CheckedFuture;
10 import org.junit.Before;
11 import org.junit.Test;
12 import org.mockito.ArgumentMatcher;
13 import org.mockito.Mock;
14 import org.mockito.MockitoAnnotations;
15 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
16 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
17 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
18 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
20 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
21 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
22 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
23 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
24 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
25 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
31 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
32 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
33 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
34 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
35 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
36 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
37 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
38 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
39 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
44 import scala.concurrent.Await;
45 import scala.concurrent.Future;
46 import scala.concurrent.duration.Duration;
48 import java.util.List;
49 import java.util.concurrent.TimeUnit;
51 import static org.junit.Assert.assertEquals;
52 import static org.junit.Assert.assertNotNull;
53 import static org.junit.Assert.assertTrue;
54 import static org.junit.Assert.fail;
55 import static org.mockito.Matchers.any;
56 import static org.mockito.Matchers.anyString;
57 import static org.mockito.Mockito.argThat;
58 import static org.mockito.Mockito.doReturn;
59 import static org.mockito.Mockito.doThrow;
60 import static org.mockito.Mockito.eq;
61 import static org.mockito.Mockito.isA;
62 import static org.mockito.Mockito.times;
63 import static org.mockito.Mockito.verify;
64 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
65 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
66 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
68 @SuppressWarnings("resource")
69 public class TransactionProxyTest extends AbstractActorTest {
71 @SuppressWarnings("serial")
72 static class TestException extends RuntimeException {
75 static interface Invoker {
76 CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
79 private final Configuration configuration = new MockConfiguration();
82 private ActorContext mockActorContext;
84 private SchemaContext schemaContext;
86 String memberName = "mock-member";
90 MockitoAnnotations.initMocks(this);
92 schemaContext = TestModel.createTestContext();
94 doReturn(getSystem()).when(mockActorContext).getActorSystem();
95 doReturn(memberName).when(mockActorContext).getCurrentMemberName();
96 doReturn(schemaContext).when(mockActorContext).getSchemaContext();
98 ShardStrategyFactory.setConfiguration(configuration);
101 private CreateTransaction eqCreateTransaction(final String memberName,
102 final TransactionType type) {
103 ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
105 public boolean matches(Object argument) {
106 CreateTransaction obj = CreateTransaction.fromSerializable(argument);
107 return obj.getTransactionId().startsWith(memberName) &&
108 obj.getTransactionType() == type.ordinal();
112 return argThat(matcher);
115 private DataExists eqDataExists() {
116 ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
118 public boolean matches(Object argument) {
119 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
120 DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
124 return argThat(matcher);
127 private ReadData eqReadData() {
128 ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
130 public boolean matches(Object argument) {
131 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
132 ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
136 return argThat(matcher);
139 private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
140 ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
142 public boolean matches(Object argument) {
143 if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
147 WriteData obj = WriteData.fromSerializable(argument, schemaContext);
148 return obj.getPath().equals(TestModel.TEST_PATH) &&
149 obj.getData().equals(nodeToWrite);
153 return argThat(matcher);
156 private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
157 ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
159 public boolean matches(Object argument) {
160 if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
164 MergeData obj = MergeData.fromSerializable(argument, schemaContext);
165 return obj.getPath().equals(TestModel.TEST_PATH) &&
166 obj.getData().equals(nodeToWrite);
170 return argThat(matcher);
173 private DeleteData eqDeleteData() {
174 ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
176 public boolean matches(Object argument) {
177 return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
178 DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
182 return argThat(matcher);
185 private Future<Object> readyTxReply(ActorPath path) {
186 return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
189 private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
190 return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
193 private Future<Object> dataExistsReply(boolean exists) {
194 return Futures.successful(new DataExistsReply(exists).toSerializable());
197 private Future<Object> writeDataReply() {
198 return Futures.successful(new WriteDataReply().toSerializable());
201 private Future<Object> mergeDataReply() {
202 return Futures.successful(new MergeDataReply().toSerializable());
205 private Future<Object> deleteDataReply() {
206 return Futures.successful(new DeleteDataReply().toSerializable());
209 private ActorSelection actorSelection(ActorRef actorRef) {
210 return getSystem().actorSelection(actorRef.path());
213 private CreateTransactionReply createTransactionReply(ActorRef actorRef){
214 return CreateTransactionReply.newBuilder()
215 .setTransactionActorPath(actorRef.path().toString())
216 .setTransactionId("txn-1").build();
219 private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) {
220 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
221 doReturn(getSystem().actorSelection(actorRef.path())).
222 when(mockActorContext).actorSelection(actorRef.path().toString());
224 doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))).
225 when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD));
227 doReturn(createTransactionReply(actorRef)).when(mockActorContext).
228 executeOperation(eq(getSystem().actorSelection(actorRef.path())),
229 eqCreateTransaction(memberName, type));
231 doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
232 anyString(), eq(actorRef.path().toString()));
234 doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
239 private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
243 future.checkedGet(5, TimeUnit.SECONDS);
244 fail("Expected ReadFailedException");
245 } catch(ReadFailedException e) {
251 public void testRead() throws Exception {
252 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
254 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
257 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
258 eq(actorSelection(actorRef)), eqReadData());
260 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
261 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
263 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
265 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
267 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
268 eq(actorSelection(actorRef)), eqReadData());
270 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
272 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
274 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
277 @Test(expected = ReadFailedException.class)
278 public void testReadWithInvalidReplyMessageType() throws Exception {
279 setupActorContextWithInitialCreateTransaction(READ_ONLY);
281 doReturn(Futures.successful(new Object())).when(mockActorContext).
282 executeOperationAsync(any(ActorSelection.class), any());
284 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
287 transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
290 @Test(expected = TestException.class)
291 public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
292 setupActorContextWithInitialCreateTransaction(READ_ONLY);
294 doReturn(Futures.failed(new TestException())).when(mockActorContext).
295 executeOperationAsync(any(ActorSelection.class), any());
297 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
300 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
303 private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
305 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
307 if (exToThrow instanceof PrimaryNotFoundException) {
308 doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
310 doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))).
311 when(mockActorContext).findPrimaryShard(anyString());
313 doThrow(exToThrow).when(mockActorContext).executeOperation(any(ActorSelection.class), any());
315 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
317 propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
320 private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
321 testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
323 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
324 return proxy.read(TestModel.TEST_PATH);
329 @Test(expected = PrimaryNotFoundException.class)
330 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
331 testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
334 @Test(expected = TimeoutException.class)
335 public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
336 testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
337 new Exception("reason")));
340 @Test(expected = TestException.class)
341 public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
342 testReadWithExceptionOnInitialCreateTransaction(new TestException());
345 @Test(expected = TestException.class)
346 public void testReadWithPriorRecordingOperationFailure() throws Throwable {
347 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
349 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
351 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
352 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
354 doReturn(Futures.failed(new TestException())).when(mockActorContext).
355 executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
357 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
358 eq(actorSelection(actorRef)), eqReadData());
360 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
363 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
365 transactionProxy.delete(TestModel.TEST_PATH);
368 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
370 verify(mockActorContext, times(0)).executeOperationAsync(
371 eq(actorSelection(actorRef)), eqReadData());
376 public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
377 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
379 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
381 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
382 eq(actorSelection(actorRef)), eqWriteData(expectedNode));
384 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
385 eq(actorSelection(actorRef)), eqReadData());
387 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
390 transactionProxy.write(TestModel.TEST_PATH, expectedNode);
392 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
393 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
395 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
397 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
400 @Test(expected=IllegalStateException.class)
401 public void testReadPreConditionCheck() {
403 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
406 transactionProxy.read(TestModel.TEST_PATH);
410 public void testExists() throws Exception {
411 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
413 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
416 doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
417 eq(actorSelection(actorRef)), eqDataExists());
419 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
421 assertEquals("Exists response", false, exists);
423 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
424 eq(actorSelection(actorRef)), eqDataExists());
426 exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
428 assertEquals("Exists response", true, exists);
431 @Test(expected = PrimaryNotFoundException.class)
432 public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
433 testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
435 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
436 return proxy.exists(TestModel.TEST_PATH);
441 @Test(expected = ReadFailedException.class)
442 public void testExistsWithInvalidReplyMessageType() throws Exception {
443 setupActorContextWithInitialCreateTransaction(READ_ONLY);
445 doReturn(Futures.successful(new Object())).when(mockActorContext).
446 executeOperationAsync(any(ActorSelection.class), any());
448 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
451 transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
454 @Test(expected = TestException.class)
455 public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
456 setupActorContextWithInitialCreateTransaction(READ_ONLY);
458 doReturn(Futures.failed(new TestException())).when(mockActorContext).
459 executeOperationAsync(any(ActorSelection.class), any());
461 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
464 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
467 @Test(expected = TestException.class)
468 public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
469 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
471 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
473 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
474 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
476 doReturn(Futures.failed(new TestException())).when(mockActorContext).
477 executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
479 doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
480 eq(actorSelection(actorRef)), eqDataExists());
482 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
485 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
487 transactionProxy.delete(TestModel.TEST_PATH);
490 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
492 verify(mockActorContext, times(0)).executeOperationAsync(
493 eq(actorSelection(actorRef)), eqDataExists());
498 public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
499 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
501 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
503 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
504 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
506 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
507 eq(actorSelection(actorRef)), eqDataExists());
509 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
512 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
514 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
516 assertEquals("Exists response", true, exists);
519 @Test(expected=IllegalStateException.class)
520 public void testxistsPreConditionCheck() {
522 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
525 transactionProxy.exists(TestModel.TEST_PATH);
528 private void verifyRecordingOperationFutures(List<Future<Object>> futures,
529 Class<?>... expResultTypes) throws Exception {
530 assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
533 for( Future<Object> future: futures) {
534 assertNotNull("Recording operation Future is null", future);
536 Class<?> expResultType = expResultTypes[i++];
537 if(Throwable.class.isAssignableFrom(expResultType)) {
539 Await.result(future, Duration.create(5, TimeUnit.SECONDS));
540 fail("Expected exception from recording operation Future");
541 } catch(Exception e) {
545 assertEquals("Recording operation Future result type", expResultType,
546 Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
552 public void testWrite() throws Exception {
553 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
555 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
557 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
558 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
560 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
563 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
565 verify(mockActorContext).executeOperationAsync(
566 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
568 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
569 WriteDataReply.SERIALIZABLE_CLASS);
572 @Test(expected=IllegalStateException.class)
573 public void testWritePreConditionCheck() {
575 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
578 transactionProxy.write(TestModel.TEST_PATH,
579 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
582 @Test(expected=IllegalStateException.class)
583 public void testWriteAfterReadyPreConditionCheck() {
585 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
588 transactionProxy.ready();
590 transactionProxy.write(TestModel.TEST_PATH,
591 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
595 public void testMerge() throws Exception {
596 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
598 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
600 doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
601 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
603 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
606 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
608 verify(mockActorContext).executeOperationAsync(
609 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
611 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
612 MergeDataReply.SERIALIZABLE_CLASS);
616 public void testDelete() throws Exception {
617 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
619 doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
620 eq(actorSelection(actorRef)), eqDeleteData());
622 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
625 transactionProxy.delete(TestModel.TEST_PATH);
627 verify(mockActorContext).executeOperationAsync(
628 eq(actorSelection(actorRef)), eqDeleteData());
630 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
631 DeleteDataReply.SERIALIZABLE_CLASS);
634 private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy,
635 Object... expReplies) throws Exception {
636 assertEquals("getReadyOperationFutures size", expReplies.length,
637 proxy.getCohortPathFutures().size());
640 for( Future<ActorPath> future: proxy.getCohortPathFutures()) {
641 assertNotNull("Ready operation Future is null", future);
643 Object expReply = expReplies[i++];
644 if(expReply instanceof ActorPath) {
645 ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
646 assertEquals("Cohort actor path", expReply, actual);
648 // Expecting exception.
650 Await.result(future, Duration.create(5, TimeUnit.SECONDS));
651 fail("Expected exception from ready operation Future");
652 } catch(Exception e) {
659 @SuppressWarnings("unchecked")
661 public void testReady() throws Exception {
662 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
664 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
666 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
667 eq(actorSelection(actorRef)), eqReadData());
669 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
670 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
672 doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeOperationAsync(
673 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
675 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
678 transactionProxy.read(TestModel.TEST_PATH);
680 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
682 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
684 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
686 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
688 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
689 WriteDataReply.SERIALIZABLE_CLASS);
691 verifyCohortPathFutures(proxy, actorRef.path());
694 @SuppressWarnings("unchecked")
696 public void testReadyWithRecordingOperationFailure() throws Exception {
697 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
699 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
701 doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
702 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
704 doReturn(Futures.failed(new TestException())).when(mockActorContext).
705 executeOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
707 doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeOperationAsync(
708 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
710 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
713 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
715 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
717 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
719 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
721 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
723 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
724 MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
726 verifyCohortPathFutures(proxy, TestException.class);
729 @SuppressWarnings("unchecked")
731 public void testReadyWithReplyFailure() throws Exception {
732 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
734 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
736 doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
737 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
739 doReturn(Futures.failed(new TestException())).when(mockActorContext).
740 executeOperationAsync(eq(actorSelection(actorRef)),
741 isA(ReadyTransaction.SERIALIZABLE_CLASS));
743 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
746 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
748 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
750 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
752 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
754 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
755 MergeDataReply.SERIALIZABLE_CLASS);
757 verifyCohortPathFutures(proxy, TestException.class);
761 public void testReadyWithInitialCreateTransactionFailure() throws Exception {
763 doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
764 // doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
765 // anyString(), any());
767 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
770 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
772 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
774 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
776 transactionProxy.delete(TestModel.TEST_PATH);
778 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
780 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
782 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
784 verifyCohortPathFutures(proxy, PrimaryNotFoundException.class);
787 @SuppressWarnings("unchecked")
789 public void testReadyWithInvalidReplyMessageType() throws Exception {
790 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
792 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
794 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
795 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
797 doReturn(Futures.successful(new Object())).when(mockActorContext).
798 executeOperationAsync(eq(actorSelection(actorRef)),
799 isA(ReadyTransaction.SERIALIZABLE_CLASS));
801 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
804 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
806 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
808 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
810 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
812 verifyCohortPathFutures(proxy, IllegalArgumentException.class);
816 public void testGetIdentifier() {
817 setupActorContextWithInitialCreateTransaction(READ_ONLY);
818 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
819 TransactionProxy.TransactionType.READ_ONLY);
821 Object id = transactionProxy.getIdentifier();
822 assertNotNull("getIdentifier returned null", id);
823 assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
826 @SuppressWarnings("unchecked")
828 public void testClose() throws Exception{
829 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
831 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
832 eq(actorSelection(actorRef)), eqReadData());
834 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
837 transactionProxy.read(TestModel.TEST_PATH);
839 transactionProxy.close();
841 verify(mockActorContext).sendOperationAsync(
842 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));