1 package org.opendaylight.controller.cluster.datastore;
3 import com.google.common.util.concurrent.CheckedFuture;
5 import akka.actor.ActorRef;
6 import akka.actor.ActorSelection;
7 import akka.actor.Props;
8 import akka.dispatch.Futures;
9 import com.google.common.base.Optional;
10 import org.junit.Before;
11 import org.junit.Test;
12 import org.mockito.ArgumentMatcher;
13 import org.mockito.Mock;
14 import org.mockito.MockitoAnnotations;
15 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
16 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
17 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
18 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
20 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
21 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
22 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
23 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
24 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
25 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
31 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
32 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
33 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
34 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
35 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
36 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
37 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
38 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
39 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
44 import scala.concurrent.Await;
45 import scala.concurrent.Future;
46 import scala.concurrent.duration.Duration;
48 import java.util.List;
49 import java.util.concurrent.TimeUnit;
51 import static org.junit.Assert.assertEquals;
52 import static org.junit.Assert.assertNotNull;
53 import static org.junit.Assert.assertTrue;
54 import static org.junit.Assert.fail;
55 import static org.mockito.Matchers.any;
56 import static org.mockito.Matchers.anyString;
57 import static org.mockito.Mockito.argThat;
58 import static org.mockito.Mockito.doReturn;
59 import static org.mockito.Mockito.doThrow;
60 import static org.mockito.Mockito.eq;
61 import static org.mockito.Mockito.isA;
62 import static org.mockito.Mockito.times;
63 import static org.mockito.Mockito.verify;
64 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
65 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
66 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
68 @SuppressWarnings("resource")
69 public class TransactionProxyTest extends AbstractActorTest {
71 @SuppressWarnings("serial")
72 static class TestException extends RuntimeException {
75 static interface Invoker {
76 CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
79 private final Configuration configuration = new MockConfiguration();
82 private ActorContext mockActorContext;
84 private SchemaContext schemaContext;
86 String memberName = "mock-member";
90 MockitoAnnotations.initMocks(this);
92 schemaContext = TestModel.createTestContext();
94 doReturn(getSystem()).when(mockActorContext).getActorSystem();
95 doReturn(memberName).when(mockActorContext).getCurrentMemberName();
96 doReturn(schemaContext).when(mockActorContext).getSchemaContext();
98 ShardStrategyFactory.setConfiguration(configuration);
101 private CreateTransaction eqCreateTransaction(final String memberName,
102 final TransactionType type) {
103 ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
105 public boolean matches(Object argument) {
106 CreateTransaction obj = CreateTransaction.fromSerializable(argument);
107 return obj.getTransactionId().startsWith(memberName) &&
108 obj.getTransactionType() == type.ordinal();
112 return argThat(matcher);
115 private DataExists eqDataExists() {
116 ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
118 public boolean matches(Object argument) {
119 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
120 DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
124 return argThat(matcher);
127 private ReadData eqReadData() {
128 ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
130 public boolean matches(Object argument) {
131 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
132 ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
136 return argThat(matcher);
139 private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
140 ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
142 public boolean matches(Object argument) {
143 if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
147 WriteData obj = WriteData.fromSerializable(argument, schemaContext);
148 return obj.getPath().equals(TestModel.TEST_PATH) &&
149 obj.getData().equals(nodeToWrite);
153 return argThat(matcher);
156 private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
157 ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
159 public boolean matches(Object argument) {
160 if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
164 MergeData obj = MergeData.fromSerializable(argument, schemaContext);
165 return obj.getPath().equals(TestModel.TEST_PATH) &&
166 obj.getData().equals(nodeToWrite);
170 return argThat(matcher);
173 private DeleteData eqDeleteData() {
174 ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
176 public boolean matches(Object argument) {
177 return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
178 DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
182 return argThat(matcher);
185 private Future<Object> readyTxReply(String path) {
186 return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
189 private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
190 return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
193 private Future<Object> dataExistsReply(boolean exists) {
194 return Futures.successful(new DataExistsReply(exists).toSerializable());
197 private Future<Object> writeDataReply() {
198 return Futures.successful(new WriteDataReply().toSerializable());
201 private Future<Object> mergeDataReply() {
202 return Futures.successful(new MergeDataReply().toSerializable());
205 private Future<Object> deleteDataReply() {
206 return Futures.successful(new DeleteDataReply().toSerializable());
209 private ActorSelection actorSelection(ActorRef actorRef) {
210 return getSystem().actorSelection(actorRef.path());
213 private CreateTransactionReply createTransactionReply(ActorRef actorRef){
214 return CreateTransactionReply.newBuilder()
215 .setTransactionActorPath(actorRef.path().toString())
216 .setTransactionId("txn-1").build();
219 private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) {
220 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
221 doReturn(getSystem().actorSelection(actorRef.path())).
222 when(mockActorContext).actorSelection(actorRef.path().toString());
224 doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))).
225 when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD));
227 doReturn(createTransactionReply(actorRef)).when(mockActorContext).
228 executeOperation(eq(getSystem().actorSelection(actorRef.path())),
229 eqCreateTransaction(memberName, type));
233 private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
237 future.checkedGet(5, TimeUnit.SECONDS);
238 fail("Expected ReadFailedException");
239 } catch(ReadFailedException e) {
245 public void testRead() throws Exception {
246 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
248 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
251 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
252 eq(actorSelection(actorRef)), eqReadData());
254 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
255 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
257 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
259 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
261 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
262 eq(actorSelection(actorRef)), eqReadData());
264 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
266 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
268 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
271 @Test(expected = ReadFailedException.class)
272 public void testReadWithInvalidReplyMessageType() throws Exception {
273 setupActorContextWithInitialCreateTransaction(READ_ONLY);
275 doReturn(Futures.successful(new Object())).when(mockActorContext).
276 executeOperationAsync(any(ActorSelection.class), any());
278 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
281 transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
284 @Test(expected = TestException.class)
285 public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
286 setupActorContextWithInitialCreateTransaction(READ_ONLY);
288 doReturn(Futures.failed(new TestException())).when(mockActorContext).
289 executeOperationAsync(any(ActorSelection.class), any());
291 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
294 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
297 private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
299 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
301 if (exToThrow instanceof PrimaryNotFoundException) {
302 doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
304 doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))).
305 when(mockActorContext).findPrimaryShard(anyString());
307 doThrow(exToThrow).when(mockActorContext).executeOperation(any(ActorSelection.class), any());
309 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
311 propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
314 private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
315 testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
317 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
318 return proxy.read(TestModel.TEST_PATH);
323 @Test(expected = PrimaryNotFoundException.class)
324 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
325 testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
328 @Test(expected = TimeoutException.class)
329 public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
330 testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
331 new Exception("reason")));
334 @Test(expected = TestException.class)
335 public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
336 testReadWithExceptionOnInitialCreateTransaction(new TestException());
339 @Test(expected = TestException.class)
340 public void testReadWithPriorRecordingOperationFailure() throws Throwable {
341 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
343 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
345 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
346 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
348 doReturn(Futures.failed(new TestException())).when(mockActorContext).
349 executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
351 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
352 eq(actorSelection(actorRef)), eqReadData());
354 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
357 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
359 transactionProxy.delete(TestModel.TEST_PATH);
362 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
364 verify(mockActorContext, times(0)).executeOperationAsync(
365 eq(actorSelection(actorRef)), eqReadData());
370 public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
371 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
373 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
375 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
376 eq(actorSelection(actorRef)), eqWriteData(expectedNode));
378 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
379 eq(actorSelection(actorRef)), eqReadData());
381 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
384 transactionProxy.write(TestModel.TEST_PATH, expectedNode);
386 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
387 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
389 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
391 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
394 @Test(expected=IllegalStateException.class)
395 public void testReadPreConditionCheck() {
397 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
400 transactionProxy.read(TestModel.TEST_PATH);
404 public void testExists() throws Exception {
405 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
407 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
410 doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
411 eq(actorSelection(actorRef)), eqDataExists());
413 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
415 assertEquals("Exists response", false, exists);
417 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
418 eq(actorSelection(actorRef)), eqDataExists());
420 exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
422 assertEquals("Exists response", true, exists);
425 @Test(expected = PrimaryNotFoundException.class)
426 public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
427 testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
429 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
430 return proxy.exists(TestModel.TEST_PATH);
435 @Test(expected = ReadFailedException.class)
436 public void testExistsWithInvalidReplyMessageType() throws Exception {
437 setupActorContextWithInitialCreateTransaction(READ_ONLY);
439 doReturn(Futures.successful(new Object())).when(mockActorContext).
440 executeOperationAsync(any(ActorSelection.class), any());
442 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
445 transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
448 @Test(expected = TestException.class)
449 public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
450 setupActorContextWithInitialCreateTransaction(READ_ONLY);
452 doReturn(Futures.failed(new TestException())).when(mockActorContext).
453 executeOperationAsync(any(ActorSelection.class), any());
455 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
458 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
461 @Test(expected = TestException.class)
462 public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
463 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
465 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
467 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
468 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
470 doReturn(Futures.failed(new TestException())).when(mockActorContext).
471 executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
473 doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
474 eq(actorSelection(actorRef)), eqDataExists());
476 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
479 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
481 transactionProxy.delete(TestModel.TEST_PATH);
484 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
486 verify(mockActorContext, times(0)).executeOperationAsync(
487 eq(actorSelection(actorRef)), eqDataExists());
492 public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
493 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
495 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
497 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
498 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
500 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
501 eq(actorSelection(actorRef)), eqDataExists());
503 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
506 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
508 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
510 assertEquals("Exists response", true, exists);
513 @Test(expected=IllegalStateException.class)
514 public void testxistsPreConditionCheck() {
516 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
519 transactionProxy.exists(TestModel.TEST_PATH);
522 private void verifyRecordingOperationFutures(List<Future<Object>> futures,
523 Class<?>... expResultTypes) throws Exception {
524 assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
527 for( Future<Object> future: futures) {
528 assertNotNull("Recording operation Future is null", future);
530 Class<?> expResultType = expResultTypes[i++];
531 if(Throwable.class.isAssignableFrom(expResultType)) {
533 Await.result(future, Duration.create(5, TimeUnit.SECONDS));
534 fail("Expected exception from recording operation Future");
535 } catch(Exception e) {
539 assertEquals("Recording operation Future result type", expResultType,
540 Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
546 public void testWrite() throws Exception {
547 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
549 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
551 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
552 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
554 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
557 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
559 verify(mockActorContext).executeOperationAsync(
560 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
562 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
563 WriteDataReply.SERIALIZABLE_CLASS);
566 @Test(expected=IllegalStateException.class)
567 public void testWritePreConditionCheck() {
569 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
572 transactionProxy.write(TestModel.TEST_PATH,
573 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
576 @Test(expected=IllegalStateException.class)
577 public void testWriteAfterReadyPreConditionCheck() {
579 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
582 transactionProxy.ready();
584 transactionProxy.write(TestModel.TEST_PATH,
585 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
589 public void testMerge() throws Exception {
590 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
592 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
594 doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
595 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
597 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
600 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
602 verify(mockActorContext).executeOperationAsync(
603 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
605 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
606 MergeDataReply.SERIALIZABLE_CLASS);
610 public void testDelete() throws Exception {
611 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
613 doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
614 eq(actorSelection(actorRef)), eqDeleteData());
616 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
619 transactionProxy.delete(TestModel.TEST_PATH);
621 verify(mockActorContext).executeOperationAsync(
622 eq(actorSelection(actorRef)), eqDeleteData());
624 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
625 DeleteDataReply.SERIALIZABLE_CLASS);
628 private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
629 Object... expReplies) throws Exception {
630 assertEquals("getReadyOperationFutures size", expReplies.length,
631 proxy.getCohortFutures().size());
634 for( Future<ActorSelection> future: proxy.getCohortFutures()) {
635 assertNotNull("Ready operation Future is null", future);
637 Object expReply = expReplies[i++];
638 if(expReply instanceof ActorSelection) {
639 ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
640 assertEquals("Cohort actor path", (ActorSelection) expReply, actual);
642 // Expecting exception.
644 Await.result(future, Duration.create(5, TimeUnit.SECONDS));
645 fail("Expected exception from ready operation Future");
646 } catch(Exception e) {
653 @SuppressWarnings("unchecked")
655 public void testReady() throws Exception {
656 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
658 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
660 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
661 eq(actorSelection(actorRef)), eqReadData());
663 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
664 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
666 doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
667 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
669 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
672 transactionProxy.read(TestModel.TEST_PATH);
674 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
676 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
678 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
680 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
682 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
683 WriteDataReply.SERIALIZABLE_CLASS);
685 verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
688 @SuppressWarnings("unchecked")
690 public void testReadyWithRecordingOperationFailure() throws Exception {
691 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
693 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
695 doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
696 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
698 doReturn(Futures.failed(new TestException())).when(mockActorContext).
699 executeOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
701 doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
702 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
704 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
707 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
709 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
711 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
713 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
715 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
717 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
718 MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
720 verifyCohortFutures(proxy, TestException.class);
723 @SuppressWarnings("unchecked")
725 public void testReadyWithReplyFailure() throws Exception {
726 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
728 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
730 doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
731 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
733 doReturn(Futures.failed(new TestException())).when(mockActorContext).
734 executeOperationAsync(eq(actorSelection(actorRef)),
735 isA(ReadyTransaction.SERIALIZABLE_CLASS));
737 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
740 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
742 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
744 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
746 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
748 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
749 MergeDataReply.SERIALIZABLE_CLASS);
751 verifyCohortFutures(proxy, TestException.class);
755 public void testReadyWithInitialCreateTransactionFailure() throws Exception {
757 doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
758 // doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
759 // anyString(), any());
761 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
764 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
766 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
768 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
770 transactionProxy.delete(TestModel.TEST_PATH);
772 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
774 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
776 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
778 verifyCohortFutures(proxy, PrimaryNotFoundException.class);
781 @SuppressWarnings("unchecked")
783 public void testReadyWithInvalidReplyMessageType() throws Exception {
784 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
786 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
788 doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
789 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
791 doReturn(Futures.successful(new Object())).when(mockActorContext).
792 executeOperationAsync(eq(actorSelection(actorRef)),
793 isA(ReadyTransaction.SERIALIZABLE_CLASS));
795 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
798 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
800 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
802 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
804 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
806 verifyCohortFutures(proxy, IllegalArgumentException.class);
810 public void testGetIdentifier() {
811 setupActorContextWithInitialCreateTransaction(READ_ONLY);
812 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
813 TransactionProxy.TransactionType.READ_ONLY);
815 Object id = transactionProxy.getIdentifier();
816 assertNotNull("getIdentifier returned null", id);
817 assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
820 @SuppressWarnings("unchecked")
822 public void testClose() throws Exception{
823 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
825 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
826 eq(actorSelection(actorRef)), eqReadData());
828 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
831 transactionProxy.read(TestModel.TEST_PATH);
833 transactionProxy.close();
835 verify(mockActorContext).sendOperationAsync(
836 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));