1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertTrue;
4 import static org.junit.Assert.assertEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.fail;
7 import akka.actor.ActorPath;
8 import akka.actor.ActorRef;
9 import akka.actor.ActorSelection;
10 import akka.actor.Props;
11 import akka.dispatch.Futures;
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.CheckedFuture;
16 import org.junit.Before;
17 import org.junit.Test;
18 import org.mockito.ArgumentMatcher;
19 import org.mockito.Mock;
20 import org.mockito.MockitoAnnotations;
22 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
24 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
26 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
27 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
28 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
29 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
32 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
33 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
34 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
36 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
42 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
43 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
44 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
45 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
46 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
47 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
48 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
49 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
50 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
51 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
52 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
53 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56 import scala.concurrent.Await;
57 import scala.concurrent.Future;
58 import scala.concurrent.duration.Duration;
59 import scala.concurrent.duration.FiniteDuration;
61 import java.util.List;
62 import java.util.concurrent.TimeUnit;
64 import static org.mockito.Matchers.any;
65 import static org.mockito.Matchers.anyString;
66 import static org.mockito.Mockito.doReturn;
67 import static org.mockito.Mockito.doThrow;
68 import static org.mockito.Mockito.argThat;
69 import static org.mockito.Mockito.eq;
70 import static org.mockito.Mockito.verify;
71 import static org.mockito.Mockito.isA;
72 import static org.mockito.Mockito.times;
74 @SuppressWarnings("resource")
75 public class TransactionProxyTest extends AbstractActorTest {
77 @SuppressWarnings("serial")
78 static class TestException extends RuntimeException {
81 static interface Invoker {
82 CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
85 private final Configuration configuration = new MockConfiguration();
88 private ActorContext mockActorContext;
90 private SchemaContext schemaContext;
92 String memberName = "mock-member";
96 MockitoAnnotations.initMocks(this);
98 schemaContext = TestModel.createTestContext();
100 doReturn(getSystem()).when(mockActorContext).getActorSystem();
101 doReturn(memberName).when(mockActorContext).getCurrentMemberName();
102 doReturn(schemaContext).when(mockActorContext).getSchemaContext();
104 ShardStrategyFactory.setConfiguration(configuration);
107 private CreateTransaction eqCreateTransaction(final String memberName,
108 final TransactionType type) {
109 ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
111 public boolean matches(Object argument) {
112 CreateTransaction obj = CreateTransaction.fromSerializable(argument);
113 return obj.getTransactionId().startsWith(memberName) &&
114 obj.getTransactionType() == type.ordinal();
118 return argThat(matcher);
121 private DataExists eqDataExists() {
122 ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
124 public boolean matches(Object argument) {
125 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
126 DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
130 return argThat(matcher);
133 private ReadData eqReadData() {
134 ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
136 public boolean matches(Object argument) {
137 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
138 ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
142 return argThat(matcher);
145 private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
146 ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
148 public boolean matches(Object argument) {
149 if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
153 WriteData obj = WriteData.fromSerializable(argument, schemaContext);
154 return obj.getPath().equals(TestModel.TEST_PATH) &&
155 obj.getData().equals(nodeToWrite);
159 return argThat(matcher);
162 private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
163 ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
165 public boolean matches(Object argument) {
166 if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
170 MergeData obj = MergeData.fromSerializable(argument, schemaContext);
171 return obj.getPath().equals(TestModel.TEST_PATH) &&
172 obj.getData().equals(nodeToWrite);
176 return argThat(matcher);
179 private DeleteData eqDeleteData() {
180 ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
182 public boolean matches(Object argument) {
183 return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
184 DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
188 return argThat(matcher);
191 private Future<Object> readyTxReply(ActorPath path) {
192 return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
195 private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
196 return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
199 private Future<Object> dataExistsReply(boolean exists) {
200 return Futures.successful(new DataExistsReply(exists).toSerializable());
203 private Future<Object> writeDataReply() {
204 return Futures.successful(new WriteDataReply().toSerializable());
207 private Future<Object> mergeDataReply() {
208 return Futures.successful(new MergeDataReply().toSerializable());
211 private Future<Object> deleteDataReply() {
212 return Futures.successful(new DeleteDataReply().toSerializable());
215 private ActorSelection actorSelection(ActorRef actorRef) {
216 return getSystem().actorSelection(actorRef.path());
219 private FiniteDuration anyDuration() {
220 return any(FiniteDuration.class);
223 private CreateTransactionReply createTransactionReply(ActorRef actorRef){
224 return CreateTransactionReply.newBuilder()
225 .setTransactionActorPath(actorRef.path().toString())
226 .setTransactionId("txn-1").build();
229 private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) {
230 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
231 doReturn(getSystem().actorSelection(actorRef.path())).
232 when(mockActorContext).actorSelection(actorRef.path().toString());
233 doReturn(createTransactionReply(actorRef)).when(mockActorContext).
234 executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
235 eqCreateTransaction(memberName, type), anyDuration());
236 doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
237 anyString(), eq(actorRef.path().toString()));
238 doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
243 private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
247 future.checkedGet(5, TimeUnit.SECONDS);
248 fail("Expected ReadFailedException");
249 } catch(ReadFailedException e) {
255 public void testRead() throws Exception {
256 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
258 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
261 doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
262 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
264 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
265 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
267 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
269 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
271 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
272 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
274 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
276 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
278 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
281 @Test(expected = ReadFailedException.class)
282 public void testReadWithInvalidReplyMessageType() throws Exception {
283 setupActorContextWithInitialCreateTransaction(READ_ONLY);
285 doReturn(Futures.successful(new Object())).when(mockActorContext).
286 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
288 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
291 transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
294 @Test(expected = TestException.class)
295 public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
296 setupActorContextWithInitialCreateTransaction(READ_ONLY);
298 doReturn(Futures.failed(new TestException())).when(mockActorContext).
299 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
301 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
304 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
307 private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
310 doThrow(exToThrow).when(mockActorContext).executeShardOperation(
311 anyString(), any(), anyDuration());
313 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
316 propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
319 private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
320 testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
322 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
323 return proxy.read(TestModel.TEST_PATH);
328 @Test(expected = PrimaryNotFoundException.class)
329 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
330 testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
333 @Test(expected = TimeoutException.class)
334 public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
335 testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
336 new Exception("reason")));
339 @Test(expected = TestException.class)
340 public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
341 testReadWithExceptionOnInitialCreateTransaction(new TestException());
344 @Test(expected = TestException.class)
345 public void testReadWithPriorRecordingOperationFailure() throws Throwable {
346 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
348 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
350 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
351 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
353 doReturn(Futures.failed(new TestException())).when(mockActorContext).
354 executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
357 doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
358 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
360 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
363 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
365 transactionProxy.delete(TestModel.TEST_PATH);
368 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
370 verify(mockActorContext, times(0)).executeRemoteOperationAsync(
371 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
376 public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
377 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
379 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
381 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
382 eq(actorSelection(actorRef)), eqWriteData(expectedNode), anyDuration());
384 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
385 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
387 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
390 transactionProxy.write(TestModel.TEST_PATH, expectedNode);
392 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
393 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
395 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
397 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
400 @Test(expected=IllegalStateException.class)
401 public void testReadPreConditionCheck() {
403 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
406 transactionProxy.read(TestModel.TEST_PATH);
410 public void testExists() throws Exception {
411 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
413 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
416 doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
417 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
419 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
421 assertEquals("Exists response", false, exists);
423 doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
424 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
426 exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
428 assertEquals("Exists response", true, exists);
431 @Test(expected = PrimaryNotFoundException.class)
432 public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
433 testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
435 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
436 return proxy.exists(TestModel.TEST_PATH);
441 @Test(expected = ReadFailedException.class)
442 public void testExistsWithInvalidReplyMessageType() throws Exception {
443 setupActorContextWithInitialCreateTransaction(READ_ONLY);
445 doReturn(Futures.successful(new Object())).when(mockActorContext).
446 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
448 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
451 transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
454 @Test(expected = TestException.class)
455 public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
456 setupActorContextWithInitialCreateTransaction(READ_ONLY);
458 doReturn(Futures.failed(new TestException())).when(mockActorContext).
459 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
461 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
464 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
467 @Test(expected = TestException.class)
468 public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
469 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
471 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
473 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
474 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
476 doReturn(Futures.failed(new TestException())).when(mockActorContext).
477 executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
480 doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
481 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
483 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
486 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
488 transactionProxy.delete(TestModel.TEST_PATH);
491 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
493 verify(mockActorContext, times(0)).executeRemoteOperationAsync(
494 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
499 public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
500 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
502 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
504 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
505 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
507 doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
508 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
510 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
513 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
515 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
517 assertEquals("Exists response", true, exists);
520 @Test(expected=IllegalStateException.class)
521 public void testxistsPreConditionCheck() {
523 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
526 transactionProxy.exists(TestModel.TEST_PATH);
529 private void verifyRecordingOperationFutures(List<Future<Object>> futures,
530 Class<?>... expResultTypes) throws Exception {
531 assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
534 for( Future<Object> future: futures) {
535 assertNotNull("Recording operation Future is null", future);
537 Class<?> expResultType = expResultTypes[i++];
538 if(Throwable.class.isAssignableFrom(expResultType)) {
540 Await.result(future, Duration.create(5, TimeUnit.SECONDS));
541 fail("Expected exception from recording operation Future");
542 } catch(Exception e) {
546 assertEquals("Recording operation Future result type", expResultType,
547 Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
553 public void testWrite() throws Exception {
554 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
556 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
558 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
559 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
561 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
564 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
566 verify(mockActorContext).executeRemoteOperationAsync(
567 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
569 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
570 WriteDataReply.SERIALIZABLE_CLASS);
573 @Test(expected=IllegalStateException.class)
574 public void testWritePreConditionCheck() {
576 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
579 transactionProxy.write(TestModel.TEST_PATH,
580 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
583 @Test(expected=IllegalStateException.class)
584 public void testWriteAfterReadyPreConditionCheck() {
586 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
589 transactionProxy.ready();
591 transactionProxy.write(TestModel.TEST_PATH,
592 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
596 public void testMerge() throws Exception {
597 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
599 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
601 doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
602 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
604 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
607 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
609 verify(mockActorContext).executeRemoteOperationAsync(
610 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
612 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
613 MergeDataReply.SERIALIZABLE_CLASS);
617 public void testDelete() throws Exception {
618 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
620 doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
621 eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
623 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
626 transactionProxy.delete(TestModel.TEST_PATH);
628 verify(mockActorContext).executeRemoteOperationAsync(
629 eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
631 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
632 DeleteDataReply.SERIALIZABLE_CLASS);
635 private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy,
636 Object... expReplies) throws Exception {
637 assertEquals("getReadyOperationFutures size", expReplies.length,
638 proxy.getCohortPathFutures().size());
641 for( Future<ActorPath> future: proxy.getCohortPathFutures()) {
642 assertNotNull("Ready operation Future is null", future);
644 Object expReply = expReplies[i++];
645 if(expReply instanceof ActorPath) {
646 ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
647 assertEquals("Cohort actor path", expReply, actual);
649 // Expecting exception.
651 Await.result(future, Duration.create(5, TimeUnit.SECONDS));
652 fail("Expected exception from ready operation Future");
653 } catch(Exception e) {
660 @SuppressWarnings("unchecked")
662 public void testReady() throws Exception {
663 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
665 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
667 doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
668 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
670 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
671 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
673 doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
674 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
676 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
679 transactionProxy.read(TestModel.TEST_PATH);
681 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
683 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
685 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
687 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
689 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
690 WriteDataReply.SERIALIZABLE_CLASS);
692 verifyCohortPathFutures(proxy, actorRef.path());
695 @SuppressWarnings("unchecked")
697 public void testReadyWithRecordingOperationFailure() throws Exception {
698 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
700 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
702 doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
703 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
705 doReturn(Futures.failed(new TestException())).when(mockActorContext).
706 executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite),
709 doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
710 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
712 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
715 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
717 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
719 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
721 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
723 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
725 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
726 MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
728 verifyCohortPathFutures(proxy, TestException.class);
731 @SuppressWarnings("unchecked")
733 public void testReadyWithReplyFailure() throws Exception {
734 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
736 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
738 doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
739 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
741 doReturn(Futures.failed(new TestException())).when(mockActorContext).
742 executeRemoteOperationAsync(eq(actorSelection(actorRef)),
743 isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
745 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
748 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
750 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
752 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
754 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
756 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
757 MergeDataReply.SERIALIZABLE_CLASS);
759 verifyCohortPathFutures(proxy, TestException.class);
763 public void testReadyWithInitialCreateTransactionFailure() throws Exception {
765 doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
766 anyString(), any(), anyDuration());
768 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
771 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
773 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
775 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
777 transactionProxy.delete(TestModel.TEST_PATH);
779 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
781 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
783 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
785 verifyCohortPathFutures(proxy, PrimaryNotFoundException.class);
788 @SuppressWarnings("unchecked")
790 public void testReadyWithInvalidReplyMessageType() throws Exception {
791 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
793 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
795 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
796 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
798 doReturn(Futures.successful(new Object())).when(mockActorContext).
799 executeRemoteOperationAsync(eq(actorSelection(actorRef)),
800 isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
802 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
805 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
807 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
809 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
811 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
813 verifyCohortPathFutures(proxy, IllegalArgumentException.class);
817 public void testGetIdentifier() {
818 setupActorContextWithInitialCreateTransaction(READ_ONLY);
819 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
820 TransactionProxy.TransactionType.READ_ONLY);
822 Object id = transactionProxy.getIdentifier();
823 assertNotNull("getIdentifier returned null", id);
824 assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
827 @SuppressWarnings("unchecked")
829 public void testClose() throws Exception{
830 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
832 doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
833 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
835 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
838 transactionProxy.read(TestModel.TEST_PATH);
840 transactionProxy.close();
842 verify(mockActorContext).sendRemoteOperationAsync(
843 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));