1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertTrue;
4 import static org.junit.Assert.assertEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.fail;
7 import akka.actor.ActorPath;
8 import akka.actor.ActorRef;
9 import akka.actor.ActorSelection;
10 import akka.actor.Props;
11 import akka.dispatch.Futures;
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.CheckedFuture;
16 import org.junit.Before;
17 import org.junit.Test;
18 import org.mockito.ArgumentMatcher;
19 import org.mockito.Mock;
20 import org.mockito.MockitoAnnotations;
22 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
24 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
26 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
27 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
28 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
29 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
32 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
33 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
34 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
36 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
42 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
43 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
44 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
45 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
46 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
47 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
48 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
49 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
50 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
51 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
52 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
53 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56 import scala.concurrent.Await;
57 import scala.concurrent.Future;
58 import scala.concurrent.duration.Duration;
59 import java.util.List;
60 import java.util.concurrent.TimeUnit;
62 import static org.mockito.Matchers.any;
63 import static org.mockito.Matchers.anyString;
64 import static org.mockito.Mockito.doReturn;
65 import static org.mockito.Mockito.doThrow;
66 import static org.mockito.Mockito.argThat;
67 import static org.mockito.Mockito.eq;
68 import static org.mockito.Mockito.verify;
69 import static org.mockito.Mockito.isA;
70 import static org.mockito.Mockito.times;
72 @SuppressWarnings("resource")
73 public class TransactionProxyTest extends AbstractActorTest {
75 @SuppressWarnings("serial")
76 static class TestException extends RuntimeException {
79 static interface Invoker {
80 CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
83 private final Configuration configuration = new MockConfiguration();
86 private ActorContext mockActorContext;
88 private SchemaContext schemaContext;
90 String memberName = "mock-member";
94 MockitoAnnotations.initMocks(this);
96 schemaContext = TestModel.createTestContext();
98 doReturn(getSystem()).when(mockActorContext).getActorSystem();
99 doReturn(memberName).when(mockActorContext).getCurrentMemberName();
100 doReturn(schemaContext).when(mockActorContext).getSchemaContext();
102 ShardStrategyFactory.setConfiguration(configuration);
105 private CreateTransaction eqCreateTransaction(final String memberName,
106 final TransactionType type) {
107 ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
109 public boolean matches(Object argument) {
110 CreateTransaction obj = CreateTransaction.fromSerializable(argument);
111 return obj.getTransactionId().startsWith(memberName) &&
112 obj.getTransactionType() == type.ordinal();
116 return argThat(matcher);
119 private DataExists eqDataExists() {
120 ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
122 public boolean matches(Object argument) {
123 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
124 DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
128 return argThat(matcher);
131 private ReadData eqReadData() {
132 ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
134 public boolean matches(Object argument) {
135 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
136 ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
140 return argThat(matcher);
143 private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
144 ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
146 public boolean matches(Object argument) {
147 if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
151 WriteData obj = WriteData.fromSerializable(argument, schemaContext);
152 return obj.getPath().equals(TestModel.TEST_PATH) &&
153 obj.getData().equals(nodeToWrite);
157 return argThat(matcher);
160 private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
161 ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
163 public boolean matches(Object argument) {
164 if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
168 MergeData obj = MergeData.fromSerializable(argument, schemaContext);
169 return obj.getPath().equals(TestModel.TEST_PATH) &&
170 obj.getData().equals(nodeToWrite);
174 return argThat(matcher);
177 private DeleteData eqDeleteData() {
178 ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
180 public boolean matches(Object argument) {
181 return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
182 DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
186 return argThat(matcher);
189 private Future<Object> readyTxReply(ActorPath path) {
190 return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
193 private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
194 return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
197 private Future<Object> dataExistsReply(boolean exists) {
198 return Futures.successful(new DataExistsReply(exists).toSerializable());
201 private Future<Object> writeDataReply() {
202 return Futures.successful(new WriteDataReply().toSerializable());
205 private Future<Object> mergeDataReply() {
206 return Futures.successful(new MergeDataReply().toSerializable());
209 private Future<Object> deleteDataReply() {
210 return Futures.successful(new DeleteDataReply().toSerializable());
213 private ActorSelection actorSelection(ActorRef actorRef) {
214 return getSystem().actorSelection(actorRef.path());
217 private CreateTransactionReply createTransactionReply(ActorRef actorRef){
218 return CreateTransactionReply.newBuilder()
219 .setTransactionActorPath(actorRef.path().toString())
220 .setTransactionId("txn-1").build();
223 private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) {
224 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
225 doReturn(getSystem().actorSelection(actorRef.path())).
226 when(mockActorContext).actorSelection(actorRef.path().toString());
227 doReturn(createTransactionReply(actorRef)).when(mockActorContext).
228 executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
229 eqCreateTransaction(memberName, type));
230 doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
231 anyString(), eq(actorRef.path().toString()));
232 doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
237 private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
241 future.checkedGet(5, TimeUnit.SECONDS);
242 fail("Expected ReadFailedException");
243 } catch(ReadFailedException e) {
249 public void testRead() throws Exception {
250 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
252 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
255 doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
256 eq(actorSelection(actorRef)), eqReadData());
258 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
259 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
261 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
263 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
265 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
266 eq(actorSelection(actorRef)), eqReadData());
268 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
270 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
272 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
275 @Test(expected = ReadFailedException.class)
276 public void testReadWithInvalidReplyMessageType() throws Exception {
277 setupActorContextWithInitialCreateTransaction(READ_ONLY);
279 doReturn(Futures.successful(new Object())).when(mockActorContext).
280 executeRemoteOperationAsync(any(ActorSelection.class), any());
282 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
285 transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
288 @Test(expected = TestException.class)
289 public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
290 setupActorContextWithInitialCreateTransaction(READ_ONLY);
292 doReturn(Futures.failed(new TestException())).when(mockActorContext).
293 executeRemoteOperationAsync(any(ActorSelection.class), any());
295 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
298 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
301 private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
304 doThrow(exToThrow).when(mockActorContext).executeShardOperation(
307 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
310 propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
313 private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
314 testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
316 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
317 return proxy.read(TestModel.TEST_PATH);
322 @Test(expected = PrimaryNotFoundException.class)
323 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
324 testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
327 @Test(expected = TimeoutException.class)
328 public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
329 testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
330 new Exception("reason")));
333 @Test(expected = TestException.class)
334 public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
335 testReadWithExceptionOnInitialCreateTransaction(new TestException());
338 @Test(expected = TestException.class)
339 public void testReadWithPriorRecordingOperationFailure() throws Throwable {
340 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
342 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
344 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
345 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
347 doReturn(Futures.failed(new TestException())).when(mockActorContext).
348 executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
350 doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
351 eq(actorSelection(actorRef)), eqReadData());
353 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
356 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
358 transactionProxy.delete(TestModel.TEST_PATH);
361 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
363 verify(mockActorContext, times(0)).executeRemoteOperationAsync(
364 eq(actorSelection(actorRef)), eqReadData());
369 public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
370 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
372 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
374 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
375 eq(actorSelection(actorRef)), eqWriteData(expectedNode));
377 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
378 eq(actorSelection(actorRef)), eqReadData());
380 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
383 transactionProxy.write(TestModel.TEST_PATH, expectedNode);
385 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
386 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
388 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
390 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
393 @Test(expected=IllegalStateException.class)
394 public void testReadPreConditionCheck() {
396 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
399 transactionProxy.read(TestModel.TEST_PATH);
403 public void testExists() throws Exception {
404 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
406 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
409 doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
410 eq(actorSelection(actorRef)), eqDataExists());
412 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
414 assertEquals("Exists response", false, exists);
416 doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
417 eq(actorSelection(actorRef)), eqDataExists());
419 exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
421 assertEquals("Exists response", true, exists);
424 @Test(expected = PrimaryNotFoundException.class)
425 public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
426 testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
428 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
429 return proxy.exists(TestModel.TEST_PATH);
434 @Test(expected = ReadFailedException.class)
435 public void testExistsWithInvalidReplyMessageType() throws Exception {
436 setupActorContextWithInitialCreateTransaction(READ_ONLY);
438 doReturn(Futures.successful(new Object())).when(mockActorContext).
439 executeRemoteOperationAsync(any(ActorSelection.class), any());
441 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
444 transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
447 @Test(expected = TestException.class)
448 public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
449 setupActorContextWithInitialCreateTransaction(READ_ONLY);
451 doReturn(Futures.failed(new TestException())).when(mockActorContext).
452 executeRemoteOperationAsync(any(ActorSelection.class), any());
454 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
457 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
460 @Test(expected = TestException.class)
461 public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
462 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
464 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
466 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
467 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
469 doReturn(Futures.failed(new TestException())).when(mockActorContext).
470 executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
472 doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
473 eq(actorSelection(actorRef)), eqDataExists());
475 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
478 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
480 transactionProxy.delete(TestModel.TEST_PATH);
483 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
485 verify(mockActorContext, times(0)).executeRemoteOperationAsync(
486 eq(actorSelection(actorRef)), eqDataExists());
491 public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
492 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
494 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
496 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
497 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
499 doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
500 eq(actorSelection(actorRef)), eqDataExists());
502 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
505 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
507 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
509 assertEquals("Exists response", true, exists);
512 @Test(expected=IllegalStateException.class)
513 public void testxistsPreConditionCheck() {
515 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
518 transactionProxy.exists(TestModel.TEST_PATH);
521 private void verifyRecordingOperationFutures(List<Future<Object>> futures,
522 Class<?>... expResultTypes) throws Exception {
523 assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
526 for( Future<Object> future: futures) {
527 assertNotNull("Recording operation Future is null", future);
529 Class<?> expResultType = expResultTypes[i++];
530 if(Throwable.class.isAssignableFrom(expResultType)) {
532 Await.result(future, Duration.create(5, TimeUnit.SECONDS));
533 fail("Expected exception from recording operation Future");
534 } catch(Exception e) {
538 assertEquals("Recording operation Future result type", expResultType,
539 Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
545 public void testWrite() throws Exception {
546 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
548 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
550 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
551 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
553 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
556 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
558 verify(mockActorContext).executeRemoteOperationAsync(
559 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
561 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
562 WriteDataReply.SERIALIZABLE_CLASS);
565 @Test(expected=IllegalStateException.class)
566 public void testWritePreConditionCheck() {
568 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
571 transactionProxy.write(TestModel.TEST_PATH,
572 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
575 @Test(expected=IllegalStateException.class)
576 public void testWriteAfterReadyPreConditionCheck() {
578 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
581 transactionProxy.ready();
583 transactionProxy.write(TestModel.TEST_PATH,
584 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
588 public void testMerge() throws Exception {
589 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
591 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
593 doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
594 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
596 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
599 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
601 verify(mockActorContext).executeRemoteOperationAsync(
602 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
604 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
605 MergeDataReply.SERIALIZABLE_CLASS);
609 public void testDelete() throws Exception {
610 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
612 doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
613 eq(actorSelection(actorRef)), eqDeleteData());
615 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
618 transactionProxy.delete(TestModel.TEST_PATH);
620 verify(mockActorContext).executeRemoteOperationAsync(
621 eq(actorSelection(actorRef)), eqDeleteData());
623 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
624 DeleteDataReply.SERIALIZABLE_CLASS);
627 private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy,
628 Object... expReplies) throws Exception {
629 assertEquals("getReadyOperationFutures size", expReplies.length,
630 proxy.getCohortPathFutures().size());
633 for( Future<ActorPath> future: proxy.getCohortPathFutures()) {
634 assertNotNull("Ready operation Future is null", future);
636 Object expReply = expReplies[i++];
637 if(expReply instanceof ActorPath) {
638 ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
639 assertEquals("Cohort actor path", expReply, actual);
641 // Expecting exception.
643 Await.result(future, Duration.create(5, TimeUnit.SECONDS));
644 fail("Expected exception from ready operation Future");
645 } catch(Exception e) {
652 @SuppressWarnings("unchecked")
654 public void testReady() throws Exception {
655 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
657 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
659 doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
660 eq(actorSelection(actorRef)), eqReadData());
662 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
663 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
665 doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
666 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
668 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
671 transactionProxy.read(TestModel.TEST_PATH);
673 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
675 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
677 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
679 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
681 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
682 WriteDataReply.SERIALIZABLE_CLASS);
684 verifyCohortPathFutures(proxy, actorRef.path());
687 @SuppressWarnings("unchecked")
689 public void testReadyWithRecordingOperationFailure() throws Exception {
690 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
692 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
694 doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
695 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
697 doReturn(Futures.failed(new TestException())).when(mockActorContext).
698 executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
700 doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
701 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
703 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
706 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
708 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
710 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
712 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
714 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
716 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
717 MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
719 verifyCohortPathFutures(proxy, TestException.class);
722 @SuppressWarnings("unchecked")
724 public void testReadyWithReplyFailure() throws Exception {
725 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
727 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
729 doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
730 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
732 doReturn(Futures.failed(new TestException())).when(mockActorContext).
733 executeRemoteOperationAsync(eq(actorSelection(actorRef)),
734 isA(ReadyTransaction.SERIALIZABLE_CLASS));
736 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
739 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
741 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
743 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
745 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
747 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
748 MergeDataReply.SERIALIZABLE_CLASS);
750 verifyCohortPathFutures(proxy, TestException.class);
754 public void testReadyWithInitialCreateTransactionFailure() throws Exception {
756 doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
759 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
762 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
764 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
766 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
768 transactionProxy.delete(TestModel.TEST_PATH);
770 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
772 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
774 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
776 verifyCohortPathFutures(proxy, PrimaryNotFoundException.class);
779 @SuppressWarnings("unchecked")
781 public void testReadyWithInvalidReplyMessageType() throws Exception {
782 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
784 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
786 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
787 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
789 doReturn(Futures.successful(new Object())).when(mockActorContext).
790 executeRemoteOperationAsync(eq(actorSelection(actorRef)),
791 isA(ReadyTransaction.SERIALIZABLE_CLASS));
793 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
796 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
798 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
800 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
802 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
804 verifyCohortPathFutures(proxy, IllegalArgumentException.class);
808 public void testGetIdentifier() {
809 setupActorContextWithInitialCreateTransaction(READ_ONLY);
810 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
811 TransactionProxy.TransactionType.READ_ONLY);
813 Object id = transactionProxy.getIdentifier();
814 assertNotNull("getIdentifier returned null", id);
815 assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
818 @SuppressWarnings("unchecked")
820 public void testClose() throws Exception{
821 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
823 doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
824 eq(actorSelection(actorRef)), eqReadData());
826 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
829 transactionProxy.read(TestModel.TEST_PATH);
831 transactionProxy.close();
833 verify(mockActorContext).sendRemoteOperationAsync(
834 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));