1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.junit.Assert.fail;
7 import static org.mockito.Matchers.any;
8 import static org.mockito.Matchers.anyString;
9 import static org.mockito.Matchers.eq;
10 import static org.mockito.Matchers.isA;
11 import static org.mockito.Mockito.doReturn;
12 import static org.mockito.Mockito.times;
13 import static org.mockito.Mockito.verify;
14 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
15 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
16 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
17 import akka.actor.ActorRef;
18 import akka.actor.ActorSelection;
19 import akka.actor.ActorSystem;
20 import akka.actor.Props;
21 import akka.dispatch.Futures;
22 import com.google.common.base.Optional;
23 import com.google.common.util.concurrent.CheckedFuture;
24 import com.google.common.util.concurrent.FutureCallback;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import java.util.List;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicReference;
30 import org.junit.Assert;
31 import org.junit.Test;
32 import org.mockito.InOrder;
33 import org.mockito.Mockito;
34 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
35 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
36 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
37 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
38 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
40 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
41 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
43 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
44 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
45 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
46 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
47 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
48 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
49 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
52 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
53 import scala.concurrent.Await;
54 import scala.concurrent.Future;
55 import scala.concurrent.Promise;
56 import scala.concurrent.duration.Duration;
58 @SuppressWarnings("resource")
59 public class TransactionProxyTest extends AbstractTransactionProxyTest {
61 @SuppressWarnings("serial")
62 static class TestException extends RuntimeException {
65 static interface Invoker {
66 CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
70 public void testRead() throws Exception {
71 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
73 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
75 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
76 eq(actorSelection(actorRef)), eqSerializedReadData());
78 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
79 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
81 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
83 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
85 doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
86 eq(actorSelection(actorRef)), eqSerializedReadData());
88 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
90 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
92 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
95 @Test(expected = ReadFailedException.class)
96 public void testReadWithInvalidReplyMessageType() throws Exception {
97 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
99 doReturn(Futures.successful(new Object())).when(mockActorContext).
100 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
102 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
104 transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
107 @Test(expected = TestException.class)
108 public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
109 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
111 doReturn(Futures.failed(new TestException())).when(mockActorContext).
112 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
114 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
116 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
119 private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
121 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
123 if (exToThrow instanceof PrimaryNotFoundException) {
124 doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
126 doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
127 when(mockActorContext).findPrimaryShardAsync(anyString());
130 doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
131 any(ActorSelection.class), any());
133 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
135 propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
138 private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
139 testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
141 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
142 return proxy.read(TestModel.TEST_PATH);
147 @Test(expected = PrimaryNotFoundException.class)
148 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
149 testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
152 @Test(expected = TimeoutException.class)
153 public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
154 testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
155 new Exception("reason")));
158 @Test(expected = TestException.class)
159 public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
160 testReadWithExceptionOnInitialCreateTransaction(new TestException());
163 @Test(expected = TestException.class)
164 public void testReadWithPriorRecordingOperationFailure() throws Throwable {
165 doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
166 when(mockActorContext).getDatastoreContext();
168 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
170 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
172 expectFailedBatchedModifications(actorRef);
174 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
175 eq(actorSelection(actorRef)), eqSerializedReadData());
177 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
179 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
181 transactionProxy.delete(TestModel.TEST_PATH);
184 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
186 verify(mockActorContext, times(0)).executeOperationAsync(
187 eq(actorSelection(actorRef)), eqSerializedReadData());
192 public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
193 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
195 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
197 expectBatchedModifications(actorRef, 1);
199 doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
200 eq(actorSelection(actorRef)), eqSerializedReadData());
202 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
204 transactionProxy.write(TestModel.TEST_PATH, expectedNode);
206 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
207 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
209 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
210 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
212 InOrder inOrder = Mockito.inOrder(mockActorContext);
213 inOrder.verify(mockActorContext).executeOperationAsync(
214 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
216 inOrder.verify(mockActorContext).executeOperationAsync(
217 eq(actorSelection(actorRef)), eqSerializedReadData());
220 @Test(expected=IllegalStateException.class)
221 public void testReadPreConditionCheck() {
222 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
223 transactionProxy.read(TestModel.TEST_PATH);
226 @Test(expected=IllegalArgumentException.class)
227 public void testInvalidCreateTransactionReply() throws Throwable {
228 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
230 doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
231 actorSelection(actorRef.path().toString());
233 doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
234 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
236 doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
237 eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
239 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
241 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
245 public void testExists() throws Exception {
246 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
248 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
250 doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
251 eq(actorSelection(actorRef)), eqSerializedDataExists());
253 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
255 assertEquals("Exists response", false, exists);
257 doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
258 eq(actorSelection(actorRef)), eqSerializedDataExists());
260 exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
262 assertEquals("Exists response", true, exists);
265 @Test(expected = PrimaryNotFoundException.class)
266 public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
267 testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
269 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
270 return proxy.exists(TestModel.TEST_PATH);
275 @Test(expected = ReadFailedException.class)
276 public void testExistsWithInvalidReplyMessageType() throws Exception {
277 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
279 doReturn(Futures.successful(new Object())).when(mockActorContext).
280 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
282 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
285 transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
288 @Test(expected = TestException.class)
289 public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
290 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
292 doReturn(Futures.failed(new TestException())).when(mockActorContext).
293 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
295 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
297 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
300 @Test(expected = TestException.class)
301 public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
302 doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
303 when(mockActorContext).getDatastoreContext();
305 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
307 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
309 expectFailedBatchedModifications(actorRef);
311 doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
312 eq(actorSelection(actorRef)), eqSerializedDataExists());
314 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
317 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
319 transactionProxy.delete(TestModel.TEST_PATH);
322 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
324 verify(mockActorContext, times(0)).executeOperationAsync(
325 eq(actorSelection(actorRef)), eqSerializedDataExists());
330 public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
331 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
333 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
335 expectBatchedModifications(actorRef, 1);
337 doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
338 eq(actorSelection(actorRef)), eqSerializedDataExists());
340 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
342 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
344 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
346 assertEquals("Exists response", true, exists);
348 InOrder inOrder = Mockito.inOrder(mockActorContext);
349 inOrder.verify(mockActorContext).executeOperationAsync(
350 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
352 inOrder.verify(mockActorContext).executeOperationAsync(
353 eq(actorSelection(actorRef)), eqSerializedDataExists());
356 @Test(expected=IllegalStateException.class)
357 public void testExistsPreConditionCheck() {
358 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
359 transactionProxy.exists(TestModel.TEST_PATH);
362 private void verifyRecordingOperationFutures(List<Future<Object>> futures,
363 Class<?>... expResultTypes) throws Exception {
364 assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
367 for( Future<Object> future: futures) {
368 assertNotNull("Recording operation Future is null", future);
370 Class<?> expResultType = expResultTypes[i++];
371 if(Throwable.class.isAssignableFrom(expResultType)) {
373 Await.result(future, Duration.create(5, TimeUnit.SECONDS));
374 fail("Expected exception from recording operation Future");
375 } catch(Exception e) {
379 assertEquals(String.format("Recording operation %d Future result type", i +1 ), expResultType,
380 Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
386 public void testWrite() throws Exception {
387 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
389 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
391 expectBatchedModifications(actorRef, 1);
392 expectReadyTransaction(actorRef);
394 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
396 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
398 // This sends the batched modification.
399 transactionProxy.ready();
401 verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
403 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
404 BatchedModificationsReply.class);
408 public void testWriteAfterAsyncRead() throws Throwable {
409 ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem());
411 Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
412 doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
413 eq(getSystem().actorSelection(actorRef.path())),
414 eqCreateTransaction(memberName, READ_WRITE));
416 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
417 eq(actorSelection(actorRef)), eqSerializedReadData());
419 expectBatchedModifications(actorRef, 1);
420 expectReadyTransaction(actorRef);
422 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
424 final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
426 final CountDownLatch readComplete = new CountDownLatch(1);
427 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
428 com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
429 new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
431 public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
433 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
434 } catch (Exception e) {
437 readComplete.countDown();
442 public void onFailure(Throwable t) {
444 readComplete.countDown();
448 createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
450 Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
452 if(caughtEx.get() != null) {
453 throw caughtEx.get();
456 // This sends the batched modification.
457 transactionProxy.ready();
459 verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
461 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
462 BatchedModificationsReply.class);
465 @Test(expected=IllegalStateException.class)
466 public void testWritePreConditionCheck() {
467 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
468 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
471 @Test(expected=IllegalStateException.class)
472 public void testWriteAfterReadyPreConditionCheck() {
473 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
475 transactionProxy.ready();
477 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
481 public void testMerge() throws Exception {
482 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
484 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
486 expectBatchedModifications(actorRef, 1);
487 expectReadyTransaction(actorRef);
489 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
491 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
493 // This sends the batched modification.
494 transactionProxy.ready();
496 verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite));
498 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
499 BatchedModificationsReply.class);
503 public void testDelete() throws Exception {
504 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
506 expectBatchedModifications(actorRef, 1);
507 expectReadyTransaction(actorRef);
509 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
511 transactionProxy.delete(TestModel.TEST_PATH);
513 // This sends the batched modification.
514 transactionProxy.ready();
516 verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH));
518 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
519 BatchedModificationsReply.class);
523 public void testReady() throws Exception {
524 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
526 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
528 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
529 eq(actorSelection(actorRef)), eqSerializedReadData());
531 expectBatchedModifications(actorRef, 1);
532 expectReadyTransaction(actorRef);
534 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
536 transactionProxy.read(TestModel.TEST_PATH);
538 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
540 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
542 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
544 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
546 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
547 BatchedModificationsReply.class);
549 verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
551 verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
552 isA(BatchedModifications.class));
556 public void testReadyWithRecordingOperationFailure() throws Exception {
557 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
559 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
561 expectFailedBatchedModifications(actorRef);
563 expectReadyTransaction(actorRef);
565 doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
567 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
569 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
571 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
573 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
575 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
577 verifyCohortFutures(proxy, TestException.class);
579 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), TestException.class);
583 public void testReadyWithReplyFailure() throws Exception {
584 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
586 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
588 expectBatchedModifications(actorRef, 1);
590 doReturn(Futures.failed(new TestException())).when(mockActorContext).
591 executeOperationAsync(eq(actorSelection(actorRef)),
592 isA(ReadyTransaction.SERIALIZABLE_CLASS));
594 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
596 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
598 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
600 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
602 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
604 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
605 BatchedModificationsReply.class);
607 verifyCohortFutures(proxy, TestException.class);
611 public void testReadyWithInitialCreateTransactionFailure() throws Exception {
613 doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
614 mockActorContext).findPrimaryShardAsync(anyString());
616 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
618 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
620 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
622 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
624 transactionProxy.delete(TestModel.TEST_PATH);
626 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
628 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
630 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
632 verifyCohortFutures(proxy, PrimaryNotFoundException.class);
636 public void testReadyWithInvalidReplyMessageType() throws Exception {
637 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
639 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
641 expectBatchedModifications(actorRef, 1);
643 doReturn(Futures.successful(new Object())).when(mockActorContext).
644 executeOperationAsync(eq(actorSelection(actorRef)),
645 isA(ReadyTransaction.SERIALIZABLE_CLASS));
647 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
649 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
651 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
653 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
655 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
657 verifyCohortFutures(proxy, IllegalArgumentException.class);
661 public void testUnusedTransaction() throws Exception {
662 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
664 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
666 assertEquals("canCommit", true, ready.canCommit().get());
667 ready.preCommit().get();
668 ready.commit().get();
672 public void testGetIdentifier() {
673 setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
674 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
675 TransactionProxy.TransactionType.READ_ONLY);
677 Object id = transactionProxy.getIdentifier();
678 assertNotNull("getIdentifier returned null", id);
679 assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
683 public void testClose() throws Exception{
684 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
686 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
687 eq(actorSelection(actorRef)), eqSerializedReadData());
689 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
691 transactionProxy.read(TestModel.TEST_PATH);
693 transactionProxy.close();
695 verify(mockActorContext).sendOperationAsync(
696 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
701 * Method to test a local Tx actor. The Tx paths are matched to decide if the
702 * Tx actor is local or not. This is done by mocking the Tx actor path
703 * and the caller paths and ensuring that the paths have the remote-address format
705 * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
706 * the paths returned for the actors for all the tests are not qualified remote paths.
707 * Hence are treated as non-local/remote actors. In short, all tests except
708 * few below run for remote actors
713 public void testLocalTxActorRead() throws Exception {
714 ActorSystem actorSystem = getSystem();
715 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
717 doReturn(actorSystem.actorSelection(shardActorRef.path())).
718 when(mockActorContext).actorSelection(shardActorRef.path().toString());
720 doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
721 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
723 String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
724 CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
725 .setTransactionId("txn-1").setTransactionActorPath(actorPath).build();
727 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
728 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
729 eqCreateTransaction(memberName, READ_ONLY));
731 doReturn(true).when(mockActorContext).isPathLocal(actorPath);
733 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
735 // negative test case with null as the reply
736 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
737 any(ActorSelection.class), eqReadData());
739 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
740 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
742 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
744 // test case with node as read data reply
745 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
747 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
748 any(ActorSelection.class), eqReadData());
750 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
752 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
754 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
756 // test for local data exists
757 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
758 any(ActorSelection.class), eqDataExists());
760 boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
762 assertEquals("Exists response", true, exists);
766 public void testLocalTxActorReady() throws Exception {
767 ActorSystem actorSystem = getSystem();
768 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
770 doReturn(actorSystem.actorSelection(shardActorRef.path())).
771 when(mockActorContext).actorSelection(shardActorRef.path().toString());
773 doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
774 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
776 String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
777 CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
778 setTransactionId("txn-1").setTransactionActorPath(actorPath).
779 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
781 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
782 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
783 eqCreateTransaction(memberName, WRITE_ONLY));
785 doReturn(true).when(mockActorContext).isPathLocal(actorPath);
787 doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
788 any(ActorSelection.class), isA(BatchedModifications.class));
790 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
792 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
793 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
795 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
796 BatchedModificationsReply.class);
799 doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
800 any(ActorSelection.class), isA(ReadyTransaction.class));
802 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
804 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
806 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
808 verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
811 private static interface TransactionProxyOperation {
812 void run(TransactionProxy transactionProxy);
815 private void throttleOperation(TransactionProxyOperation operation) {
816 throttleOperation(operation, 1, true);
819 private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
820 ActorSystem actorSystem = getSystem();
821 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
823 doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
825 doReturn(actorSystem.actorSelection(shardActorRef.path())).
826 when(mockActorContext).actorSelection(shardActorRef.path().toString());
829 doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
830 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
832 doReturn(Futures.failed(new Exception("not found")))
833 .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
836 String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
837 CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
838 setTransactionId("txn-1").setTransactionActorPath(actorPath).
839 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
841 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
842 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
843 eqCreateTransaction(memberName, READ_WRITE));
845 doReturn(true).when(mockActorContext).isPathLocal(actorPath);
847 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
849 long start = System.nanoTime();
851 operation.run(transactionProxy);
853 long end = System.nanoTime();
855 long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
856 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
857 expected, (end-start)), (end - start) > expected);
861 private void completeOperation(TransactionProxyOperation operation){
862 completeOperation(operation, true);
865 private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
866 ActorSystem actorSystem = getSystem();
867 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
869 doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
871 doReturn(actorSystem.actorSelection(shardActorRef.path())).
872 when(mockActorContext).actorSelection(shardActorRef.path().toString());
875 doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
876 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
878 doReturn(Futures.failed(new Exception("not found")))
879 .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
882 String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
883 CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
884 setTransactionId("txn-1").setTransactionActorPath(actorPath).
885 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
887 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
888 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
889 eqCreateTransaction(memberName, READ_WRITE));
891 doReturn(true).when(mockActorContext).isPathLocal(actorPath);
893 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
895 long start = System.nanoTime();
897 operation.run(transactionProxy);
899 long end = System.nanoTime();
901 long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
902 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
903 expected, (end-start)), (end - start) <= expected);
906 public void testWriteThrottling(boolean shardFound){
908 throttleOperation(new TransactionProxyOperation() {
910 public void run(TransactionProxy transactionProxy) {
911 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
913 expectBatchedModifications(2);
915 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
917 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
923 public void testWriteThrottlingWhenShardFound(){
924 throttleOperation(new TransactionProxyOperation() {
926 public void run(TransactionProxy transactionProxy) {
927 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
929 expectIncompleteBatchedModifications();
931 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
933 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
939 public void testWriteThrottlingWhenShardNotFound(){
940 // Confirm that there is no throttling when the Shard is not found
941 completeOperation(new TransactionProxyOperation() {
943 public void run(TransactionProxy transactionProxy) {
944 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
946 expectBatchedModifications(2);
948 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
950 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
958 public void testWriteCompletion(){
959 completeOperation(new TransactionProxyOperation() {
961 public void run(TransactionProxy transactionProxy) {
962 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
964 expectBatchedModifications(2);
966 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
968 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
974 public void testMergeThrottlingWhenShardFound(){
976 throttleOperation(new TransactionProxyOperation() {
978 public void run(TransactionProxy transactionProxy) {
979 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
981 expectIncompleteBatchedModifications();
983 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
985 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
991 public void testMergeThrottlingWhenShardNotFound(){
993 completeOperation(new TransactionProxyOperation() {
995 public void run(TransactionProxy transactionProxy) {
996 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
998 expectBatchedModifications(2);
1000 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1002 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1008 public void testMergeCompletion(){
1009 completeOperation(new TransactionProxyOperation() {
1011 public void run(TransactionProxy transactionProxy) {
1012 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1014 expectBatchedModifications(2);
1016 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1018 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1025 public void testDeleteThrottlingWhenShardFound(){
1027 throttleOperation(new TransactionProxyOperation() {
1029 public void run(TransactionProxy transactionProxy) {
1030 expectIncompleteBatchedModifications();
1032 transactionProxy.delete(TestModel.TEST_PATH);
1034 transactionProxy.delete(TestModel.TEST_PATH);
1041 public void testDeleteThrottlingWhenShardNotFound(){
1043 completeOperation(new TransactionProxyOperation() {
1045 public void run(TransactionProxy transactionProxy) {
1046 expectBatchedModifications(2);
1048 transactionProxy.delete(TestModel.TEST_PATH);
1050 transactionProxy.delete(TestModel.TEST_PATH);
1056 public void testDeleteCompletion(){
1057 completeOperation(new TransactionProxyOperation() {
1059 public void run(TransactionProxy transactionProxy) {
1060 expectBatchedModifications(2);
1062 transactionProxy.delete(TestModel.TEST_PATH);
1064 transactionProxy.delete(TestModel.TEST_PATH);
1071 public void testReadThrottlingWhenShardFound(){
1073 throttleOperation(new TransactionProxyOperation() {
1075 public void run(TransactionProxy transactionProxy) {
1076 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1077 any(ActorSelection.class), eqReadData());
1079 transactionProxy.read(TestModel.TEST_PATH);
1081 transactionProxy.read(TestModel.TEST_PATH);
1087 public void testReadThrottlingWhenShardNotFound(){
1089 completeOperation(new TransactionProxyOperation() {
1091 public void run(TransactionProxy transactionProxy) {
1092 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1093 any(ActorSelection.class), eqReadData());
1095 transactionProxy.read(TestModel.TEST_PATH);
1097 transactionProxy.read(TestModel.TEST_PATH);
1104 public void testReadCompletion(){
1105 completeOperation(new TransactionProxyOperation() {
1107 public void run(TransactionProxy transactionProxy) {
1108 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1110 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1111 any(ActorSelection.class), eqReadData());
1113 transactionProxy.read(TestModel.TEST_PATH);
1115 transactionProxy.read(TestModel.TEST_PATH);
1122 public void testExistsThrottlingWhenShardFound(){
1124 throttleOperation(new TransactionProxyOperation() {
1126 public void run(TransactionProxy transactionProxy) {
1127 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1128 any(ActorSelection.class), eqDataExists());
1130 transactionProxy.exists(TestModel.TEST_PATH);
1132 transactionProxy.exists(TestModel.TEST_PATH);
1138 public void testExistsThrottlingWhenShardNotFound(){
1140 completeOperation(new TransactionProxyOperation() {
1142 public void run(TransactionProxy transactionProxy) {
1143 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1144 any(ActorSelection.class), eqDataExists());
1146 transactionProxy.exists(TestModel.TEST_PATH);
1148 transactionProxy.exists(TestModel.TEST_PATH);
1155 public void testExistsCompletion(){
1156 completeOperation(new TransactionProxyOperation() {
1158 public void run(TransactionProxy transactionProxy) {
1159 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1160 any(ActorSelection.class), eqDataExists());
1162 transactionProxy.exists(TestModel.TEST_PATH);
1164 transactionProxy.exists(TestModel.TEST_PATH);
1171 public void testReadyThrottling(){
1173 throttleOperation(new TransactionProxyOperation() {
1175 public void run(TransactionProxy transactionProxy) {
1176 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1178 expectBatchedModifications(1);
1180 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1181 any(ActorSelection.class), any(ReadyTransaction.class));
1183 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1185 transactionProxy.ready();
1191 public void testReadyThrottlingWithTwoTransactionContexts(){
1193 throttleOperation(new TransactionProxyOperation() {
1195 public void run(TransactionProxy transactionProxy) {
1196 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1197 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1199 expectBatchedModifications(2);
1201 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1202 any(ActorSelection.class), any(ReadyTransaction.class));
1204 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1206 transactionProxy.write(TestModel.TEST_PATH, carsNode);
1208 transactionProxy.ready();
1214 public void testModificationOperationBatching() throws Throwable {
1215 int shardBatchedModificationCount = 3;
1216 doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
1217 when(mockActorContext).getDatastoreContext();
1219 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1221 expectBatchedModifications(actorRef, shardBatchedModificationCount);
1223 expectReadyTransaction(actorRef);
1225 YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1226 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1228 YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1229 NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1231 YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1232 NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1234 YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1235 NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1237 YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1238 NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1240 YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1241 NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1243 YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1244 YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1246 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1248 transactionProxy.write(writePath1, writeNode1);
1249 transactionProxy.write(writePath2, writeNode2);
1250 transactionProxy.delete(deletePath1);
1251 transactionProxy.merge(mergePath1, mergeNode1);
1252 transactionProxy.merge(mergePath2, mergeNode2);
1253 transactionProxy.write(writePath3, writeNode3);
1254 transactionProxy.merge(mergePath3, mergeNode3);
1255 transactionProxy.delete(deletePath2);
1257 // This sends the last batch.
1258 transactionProxy.ready();
1260 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1261 assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1263 verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
1264 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1266 verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
1267 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1269 verifyBatchedModifications(batchedModifications.get(2), new MergeModification(mergePath3, mergeNode3),
1270 new DeleteModification(deletePath2));
1272 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1273 BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
1277 public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1278 int shardBatchedModificationCount = 10;
1279 doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
1280 when(mockActorContext).getDatastoreContext();
1282 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1284 expectBatchedModifications(actorRef, shardBatchedModificationCount);
1286 YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1287 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1289 YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1290 NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1292 YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1293 NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1295 YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1296 NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1298 YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1300 doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1301 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1303 doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1304 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1306 doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
1307 eq(actorSelection(actorRef)), eqSerializedDataExists());
1309 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1311 transactionProxy.write(writePath1, writeNode1);
1312 transactionProxy.write(writePath2, writeNode2);
1314 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1315 get(5, TimeUnit.SECONDS);
1317 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1318 assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1320 transactionProxy.merge(mergePath1, mergeNode1);
1321 transactionProxy.merge(mergePath2, mergeNode2);
1323 readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1325 transactionProxy.delete(deletePath);
1327 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1328 assertEquals("Exists response", true, exists);
1330 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1331 assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1333 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1334 assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1336 verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
1337 new WriteModification(writePath2, writeNode2));
1339 verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
1340 new MergeModification(mergePath2, mergeNode2));
1342 verifyBatchedModifications(batchedModifications.get(2), new DeleteModification(deletePath));
1344 InOrder inOrder = Mockito.inOrder(mockActorContext);
1345 inOrder.verify(mockActorContext).executeOperationAsync(
1346 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1348 inOrder.verify(mockActorContext).executeOperationAsync(
1349 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1351 inOrder.verify(mockActorContext).executeOperationAsync(
1352 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1354 inOrder.verify(mockActorContext).executeOperationAsync(
1355 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1357 inOrder.verify(mockActorContext).executeOperationAsync(
1358 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1360 inOrder.verify(mockActorContext).executeOperationAsync(
1361 eq(actorSelection(actorRef)), eqSerializedDataExists());
1363 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1364 BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);