1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertTrue;
4 import static org.junit.Assert.assertEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.fail;
7 import akka.actor.ActorPath;
8 import akka.actor.ActorRef;
9 import akka.actor.ActorSelection;
10 import akka.actor.Props;
11 import akka.dispatch.Futures;
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.CheckedFuture;
16 import org.junit.Before;
17 import org.junit.Test;
18 import org.mockito.ArgumentMatcher;
19 import org.mockito.Mock;
20 import org.mockito.MockitoAnnotations;
22 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
24 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
26 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
27 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
28 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
29 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
32 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
33 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
34 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
36 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
42 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
43 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
44 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
45 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
46 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
47 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
48 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
49 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
50 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
51 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
52 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
53 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56 import scala.concurrent.Await;
57 import scala.concurrent.Future;
58 import scala.concurrent.duration.Duration;
59 import scala.concurrent.duration.FiniteDuration;
61 import java.util.List;
62 import java.util.concurrent.TimeUnit;
64 import static org.mockito.Matchers.any;
65 import static org.mockito.Matchers.anyString;
66 import static org.mockito.Mockito.doReturn;
67 import static org.mockito.Mockito.doThrow;
68 import static org.mockito.Mockito.argThat;
69 import static org.mockito.Mockito.eq;
70 import static org.mockito.Mockito.verify;
71 import static org.mockito.Mockito.isA;
72 import static org.mockito.Mockito.times;
74 @SuppressWarnings("resource")
75 public class TransactionProxyTest extends AbstractActorTest {
77 @SuppressWarnings("serial")
78 static class TestException extends RuntimeException {
81 static interface Invoker {
82 CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
85 private final Configuration configuration = new MockConfiguration();
88 private ActorContext mockActorContext;
90 private SchemaContext schemaContext;
92 String memberName = "mock-member";
96 MockitoAnnotations.initMocks(this);
98 schemaContext = TestModel.createTestContext();
100 doReturn(getSystem()).when(mockActorContext).getActorSystem();
101 doReturn(memberName).when(mockActorContext).getCurrentMemberName();
103 ShardStrategyFactory.setConfiguration(configuration);
106 private CreateTransaction eqCreateTransaction(final String memberName,
107 final TransactionType type) {
108 ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
110 public boolean matches(Object argument) {
111 CreateTransaction obj = CreateTransaction.fromSerializable(argument);
112 return obj.getTransactionId().startsWith(memberName) &&
113 obj.getTransactionType() == type.ordinal();
117 return argThat(matcher);
120 private DataExists eqDataExists() {
121 ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
123 public boolean matches(Object argument) {
124 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
125 DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
129 return argThat(matcher);
132 private ReadData eqReadData() {
133 ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
135 public boolean matches(Object argument) {
136 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
137 ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
141 return argThat(matcher);
144 private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
145 ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
147 public boolean matches(Object argument) {
148 if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
152 WriteData obj = WriteData.fromSerializable(argument, schemaContext);
153 return obj.getPath().equals(TestModel.TEST_PATH) &&
154 obj.getData().equals(nodeToWrite);
158 return argThat(matcher);
161 private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
162 ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
164 public boolean matches(Object argument) {
165 if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
169 MergeData obj = MergeData.fromSerializable(argument, schemaContext);
170 return obj.getPath().equals(TestModel.TEST_PATH) &&
171 obj.getData().equals(nodeToWrite);
175 return argThat(matcher);
178 private DeleteData eqDeleteData() {
179 ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
181 public boolean matches(Object argument) {
182 return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
183 DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
187 return argThat(matcher);
190 private Future<Object> readyTxReply(ActorPath path) {
191 return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
194 private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
195 return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
198 private Future<Object> dataExistsReply(boolean exists) {
199 return Futures.successful(new DataExistsReply(exists).toSerializable());
202 private Future<Object> writeDataReply() {
203 return Futures.successful(new WriteDataReply().toSerializable());
206 private Future<Object> mergeDataReply() {
207 return Futures.successful(new MergeDataReply().toSerializable());
210 private Future<Object> deleteDataReply() {
211 return Futures.successful(new DeleteDataReply().toSerializable());
214 private ActorSelection actorSelection(ActorRef actorRef) {
215 return getSystem().actorSelection(actorRef.path());
218 private FiniteDuration anyDuration() {
219 return any(FiniteDuration.class);
222 private CreateTransactionReply createTransactionReply(ActorRef actorRef){
223 return CreateTransactionReply.newBuilder()
224 .setTransactionActorPath(actorRef.path().toString())
225 .setTransactionId("txn-1").build();
228 private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) {
229 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
230 doReturn(getSystem().actorSelection(actorRef.path())).
231 when(mockActorContext).actorSelection(actorRef.path().toString());
232 doReturn(createTransactionReply(actorRef)).when(mockActorContext).
233 executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
234 eqCreateTransaction(memberName, type), anyDuration());
235 doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
236 anyString(), eq(actorRef.path().toString()));
237 doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
242 private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
246 future.checkedGet(5, TimeUnit.SECONDS);
247 fail("Expected ReadFailedException");
248 } catch(ReadFailedException e) {
254 public void testRead() throws Exception {
255 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
257 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
258 READ_ONLY, schemaContext);
260 doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
261 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
263 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
264 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
266 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
268 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
270 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
271 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
273 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
275 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
277 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
280 @Test(expected = ReadFailedException.class)
281 public void testReadWithInvalidReplyMessageType() throws Exception {
282 setupActorContextWithInitialCreateTransaction(READ_ONLY);
284 doReturn(Futures.successful(new Object())).when(mockActorContext).
285 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
287 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
288 READ_ONLY, schemaContext);
290 transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
293 @Test(expected = TestException.class)
294 public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
295 setupActorContextWithInitialCreateTransaction(READ_ONLY);
297 doReturn(Futures.failed(new TestException())).when(mockActorContext).
298 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
300 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
301 READ_ONLY, schemaContext);
303 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
306 private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
309 doThrow(exToThrow).when(mockActorContext).executeShardOperation(
310 anyString(), any(), anyDuration());
312 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
313 READ_ONLY, schemaContext);
315 propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
318 private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
319 testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
321 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
322 return proxy.read(TestModel.TEST_PATH);
327 @Test(expected = PrimaryNotFoundException.class)
328 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
329 testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
332 @Test(expected = TimeoutException.class)
333 public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
334 testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
335 new Exception("reason")));
338 @Test(expected = TestException.class)
339 public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
340 testReadWithExceptionOnInitialCreateTransaction(new TestException());
343 @Test(expected = TestException.class)
344 public void testReadWithPriorRecordingOperationFailure() throws Throwable {
345 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
347 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
349 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
350 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
352 doReturn(Futures.failed(new TestException())).when(mockActorContext).
353 executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
356 doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
357 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
359 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
360 READ_WRITE, schemaContext);
362 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
364 transactionProxy.delete(TestModel.TEST_PATH);
367 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
369 verify(mockActorContext, times(0)).executeRemoteOperationAsync(
370 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
375 public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
376 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
378 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
380 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
381 eq(actorSelection(actorRef)), eqWriteData(expectedNode), anyDuration());
383 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
384 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
386 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
387 READ_WRITE, schemaContext);
389 transactionProxy.write(TestModel.TEST_PATH, expectedNode);
391 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
392 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
394 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
396 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
399 @Test(expected=IllegalStateException.class)
400 public void testReadPreConditionCheck() {
402 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
403 WRITE_ONLY, schemaContext);
405 transactionProxy.read(TestModel.TEST_PATH);
409 public void testExists() throws Exception {
410 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
412 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
413 READ_ONLY, schemaContext);
415 doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
416 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
418 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
420 assertEquals("Exists response", false, exists);
422 doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
423 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
425 exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
427 assertEquals("Exists response", true, exists);
430 @Test(expected = PrimaryNotFoundException.class)
431 public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
432 testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
434 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
435 return proxy.exists(TestModel.TEST_PATH);
440 @Test(expected = ReadFailedException.class)
441 public void testExistsWithInvalidReplyMessageType() throws Exception {
442 setupActorContextWithInitialCreateTransaction(READ_ONLY);
444 doReturn(Futures.successful(new Object())).when(mockActorContext).
445 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
447 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
448 READ_ONLY, schemaContext);
450 transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
453 @Test(expected = TestException.class)
454 public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
455 setupActorContextWithInitialCreateTransaction(READ_ONLY);
457 doReturn(Futures.failed(new TestException())).when(mockActorContext).
458 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
460 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
461 READ_ONLY, schemaContext);
463 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
466 @Test(expected = TestException.class)
467 public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
468 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
470 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
472 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
473 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
475 doReturn(Futures.failed(new TestException())).when(mockActorContext).
476 executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
479 doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
480 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
482 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
483 READ_WRITE, schemaContext);
485 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
487 transactionProxy.delete(TestModel.TEST_PATH);
490 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
492 verify(mockActorContext, times(0)).executeRemoteOperationAsync(
493 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
498 public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
499 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
501 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
503 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
504 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
506 doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
507 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
509 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
510 READ_WRITE, schemaContext);
512 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
514 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
516 assertEquals("Exists response", true, exists);
519 @Test(expected=IllegalStateException.class)
520 public void testxistsPreConditionCheck() {
522 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
523 WRITE_ONLY, schemaContext);
525 transactionProxy.exists(TestModel.TEST_PATH);
528 private void verifyRecordingOperationFutures(List<Future<Object>> futures,
529 Class<?>... expResultTypes) throws Exception {
530 assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
533 for( Future<Object> future: futures) {
534 assertNotNull("Recording operation Future is null", future);
536 Class<?> expResultType = expResultTypes[i++];
537 if(Throwable.class.isAssignableFrom(expResultType)) {
539 Await.result(future, Duration.create(5, TimeUnit.SECONDS));
540 fail("Expected exception from recording operation Future");
541 } catch(Exception e) {
545 assertEquals("Recording operation Future result type", expResultType,
546 Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
552 public void testWrite() throws Exception {
553 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
555 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
557 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
558 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
560 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
561 WRITE_ONLY, schemaContext);
563 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
565 verify(mockActorContext).executeRemoteOperationAsync(
566 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
568 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
569 WriteDataReply.SERIALIZABLE_CLASS);
572 @Test(expected=IllegalStateException.class)
573 public void testWritePreConditionCheck() {
575 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
576 READ_ONLY, schemaContext);
578 transactionProxy.write(TestModel.TEST_PATH,
579 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
582 @Test(expected=IllegalStateException.class)
583 public void testWriteAfterReadyPreConditionCheck() {
585 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
586 WRITE_ONLY, schemaContext);
588 transactionProxy.ready();
590 transactionProxy.write(TestModel.TEST_PATH,
591 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
595 public void testMerge() throws Exception {
596 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
598 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
600 doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
601 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
603 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
604 WRITE_ONLY, schemaContext);
606 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
608 verify(mockActorContext).executeRemoteOperationAsync(
609 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
611 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
612 MergeDataReply.SERIALIZABLE_CLASS);
616 public void testDelete() throws Exception {
617 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
619 doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
620 eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
622 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
623 WRITE_ONLY, schemaContext);
625 transactionProxy.delete(TestModel.TEST_PATH);
627 verify(mockActorContext).executeRemoteOperationAsync(
628 eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
630 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
631 DeleteDataReply.SERIALIZABLE_CLASS);
634 private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy,
635 Object... expReplies) throws Exception {
636 assertEquals("getReadyOperationFutures size", expReplies.length,
637 proxy.getCohortPathFutures().size());
640 for( Future<ActorPath> future: proxy.getCohortPathFutures()) {
641 assertNotNull("Ready operation Future is null", future);
643 Object expReply = expReplies[i++];
644 if(expReply instanceof ActorPath) {
645 ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
646 assertEquals("Cohort actor path", expReply, actual);
648 // Expecting exception.
650 Await.result(future, Duration.create(5, TimeUnit.SECONDS));
651 fail("Expected exception from ready operation Future");
652 } catch(Exception e) {
659 @SuppressWarnings("unchecked")
661 public void testReady() throws Exception {
662 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
664 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
666 doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
667 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
669 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
670 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
672 doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
673 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
675 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
676 READ_WRITE, schemaContext);
678 transactionProxy.read(TestModel.TEST_PATH);
680 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
682 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
684 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
686 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
688 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
689 WriteDataReply.SERIALIZABLE_CLASS);
691 verifyCohortPathFutures(proxy, actorRef.path());
694 @SuppressWarnings("unchecked")
696 public void testReadyWithRecordingOperationFailure() throws Exception {
697 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
699 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
701 doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
702 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
704 doReturn(Futures.failed(new TestException())).when(mockActorContext).
705 executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite),
708 doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
709 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
711 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
712 WRITE_ONLY, schemaContext);
714 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
716 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
718 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
720 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
722 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
724 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
725 MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
727 verifyCohortPathFutures(proxy, TestException.class);
730 @SuppressWarnings("unchecked")
732 public void testReadyWithReplyFailure() throws Exception {
733 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
735 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
737 doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
738 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
740 doReturn(Futures.failed(new TestException())).when(mockActorContext).
741 executeRemoteOperationAsync(eq(actorSelection(actorRef)),
742 isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
744 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
745 WRITE_ONLY, schemaContext);
747 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
749 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
751 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
753 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
755 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
756 MergeDataReply.SERIALIZABLE_CLASS);
758 verifyCohortPathFutures(proxy, TestException.class);
762 public void testReadyWithInitialCreateTransactionFailure() throws Exception {
764 doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
765 anyString(), any(), anyDuration());
767 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
768 WRITE_ONLY, schemaContext);
770 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
772 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
774 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
776 transactionProxy.delete(TestModel.TEST_PATH);
778 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
780 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
782 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
784 verifyCohortPathFutures(proxy, PrimaryNotFoundException.class);
787 @SuppressWarnings("unchecked")
789 public void testReadyWithInvalidReplyMessageType() throws Exception {
790 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
792 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
794 doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
795 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
797 doReturn(Futures.successful(new Object())).when(mockActorContext).
798 executeRemoteOperationAsync(eq(actorSelection(actorRef)),
799 isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
801 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
802 WRITE_ONLY, schemaContext);
804 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
806 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
808 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
810 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
812 verifyCohortPathFutures(proxy, IllegalArgumentException.class);
816 public void testGetIdentifier() {
817 setupActorContextWithInitialCreateTransaction(READ_ONLY);
818 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
819 TransactionProxy.TransactionType.READ_ONLY, schemaContext);
821 Object id = transactionProxy.getIdentifier();
822 assertNotNull("getIdentifier returned null", id);
823 assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
826 @SuppressWarnings("unchecked")
828 public void testClose() throws Exception{
829 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
831 doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
832 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
834 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
835 READ_WRITE, schemaContext);
837 transactionProxy.read(TestModel.TEST_PATH);
839 transactionProxy.close();
841 verify(mockActorContext).sendRemoteOperationAsync(
842 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));