1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.junit.Assert.fail;
7 import static org.mockito.Matchers.any;
8 import static org.mockito.Matchers.anyString;
9 import static org.mockito.Matchers.eq;
10 import static org.mockito.Matchers.isA;
11 import static org.mockito.Mockito.doReturn;
12 import static org.mockito.Mockito.never;
13 import static org.mockito.Mockito.times;
14 import static org.mockito.Mockito.verify;
15 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
16 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
17 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSelection;
20 import akka.actor.ActorSystem;
21 import akka.actor.Props;
22 import akka.dispatch.Futures;
23 import com.google.common.base.Optional;
24 import com.google.common.util.concurrent.CheckedFuture;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import java.util.List;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicReference;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.mockito.InOrder;
34 import org.mockito.Mockito;
35 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
36 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
40 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
41 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
42 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
44 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
45 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
46 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
47 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
48 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
49 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
50 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
51 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
52 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
53 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
56 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
57 import scala.concurrent.Await;
58 import scala.concurrent.Future;
59 import scala.concurrent.Promise;
60 import scala.concurrent.duration.Duration;
62 @SuppressWarnings("resource")
63 public class TransactionProxyTest extends AbstractTransactionProxyTest {
65 @SuppressWarnings("serial")
66 static class TestException extends RuntimeException {
69 static interface Invoker {
70 CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
74 public void testRead() throws Exception {
75 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
77 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
79 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
80 eq(actorSelection(actorRef)), eqSerializedReadData());
82 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
83 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
85 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
87 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
89 doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
90 eq(actorSelection(actorRef)), eqSerializedReadData());
92 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
94 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
96 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
99 @Test(expected = ReadFailedException.class)
100 public void testReadWithInvalidReplyMessageType() throws Exception {
101 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
103 doReturn(Futures.successful(new Object())).when(mockActorContext).
104 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
106 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
108 transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
111 @Test(expected = TestException.class)
112 public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
113 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
115 doReturn(Futures.failed(new TestException())).when(mockActorContext).
116 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
118 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
120 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
123 private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
125 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
127 if (exToThrow instanceof PrimaryNotFoundException) {
128 doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
130 doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
131 when(mockActorContext).findPrimaryShardAsync(anyString());
134 doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
135 any(ActorSelection.class), any());
137 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
139 propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
142 private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
143 testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
145 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
146 return proxy.read(TestModel.TEST_PATH);
151 @Test(expected = PrimaryNotFoundException.class)
152 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
153 testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
156 @Test(expected = TimeoutException.class)
157 public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
158 testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
159 new Exception("reason")));
162 @Test(expected = TestException.class)
163 public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
164 testReadWithExceptionOnInitialCreateTransaction(new TestException());
167 @Test(expected = TestException.class)
168 public void testReadWithPriorRecordingOperationFailure() throws Throwable {
169 doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
170 when(mockActorContext).getDatastoreContext();
172 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
174 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
176 expectFailedBatchedModifications(actorRef);
178 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
179 eq(actorSelection(actorRef)), eqSerializedReadData());
181 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
183 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
185 transactionProxy.delete(TestModel.TEST_PATH);
188 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
190 verify(mockActorContext, times(0)).executeOperationAsync(
191 eq(actorSelection(actorRef)), eqSerializedReadData());
196 public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
197 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
199 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
201 expectBatchedModifications(actorRef, 1);
203 doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
204 eq(actorSelection(actorRef)), eqSerializedReadData());
206 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
208 transactionProxy.write(TestModel.TEST_PATH, expectedNode);
210 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
211 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
213 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
214 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
216 InOrder inOrder = Mockito.inOrder(mockActorContext);
217 inOrder.verify(mockActorContext).executeOperationAsync(
218 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
220 inOrder.verify(mockActorContext).executeOperationAsync(
221 eq(actorSelection(actorRef)), eqSerializedReadData());
224 @Test(expected=IllegalStateException.class)
225 public void testReadPreConditionCheck() {
226 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
227 transactionProxy.read(TestModel.TEST_PATH);
230 @Test(expected=IllegalArgumentException.class)
231 public void testInvalidCreateTransactionReply() throws Throwable {
232 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
234 doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
235 actorSelection(actorRef.path().toString());
237 doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
238 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
240 doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
241 eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
243 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
245 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
249 public void testExists() throws Exception {
250 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
252 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
254 doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
255 eq(actorSelection(actorRef)), eqSerializedDataExists());
257 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
259 assertEquals("Exists response", false, exists);
261 doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
262 eq(actorSelection(actorRef)), eqSerializedDataExists());
264 exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
266 assertEquals("Exists response", true, exists);
269 @Test(expected = PrimaryNotFoundException.class)
270 public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
271 testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
273 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
274 return proxy.exists(TestModel.TEST_PATH);
279 @Test(expected = ReadFailedException.class)
280 public void testExistsWithInvalidReplyMessageType() throws Exception {
281 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
283 doReturn(Futures.successful(new Object())).when(mockActorContext).
284 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
286 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
289 transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
292 @Test(expected = TestException.class)
293 public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
294 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
296 doReturn(Futures.failed(new TestException())).when(mockActorContext).
297 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
299 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
301 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
304 @Test(expected = TestException.class)
305 public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
306 doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
307 when(mockActorContext).getDatastoreContext();
309 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
311 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
313 expectFailedBatchedModifications(actorRef);
315 doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
316 eq(actorSelection(actorRef)), eqSerializedDataExists());
318 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
321 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
323 transactionProxy.delete(TestModel.TEST_PATH);
326 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
328 verify(mockActorContext, times(0)).executeOperationAsync(
329 eq(actorSelection(actorRef)), eqSerializedDataExists());
334 public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
335 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
337 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
339 expectBatchedModifications(actorRef, 1);
341 doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
342 eq(actorSelection(actorRef)), eqSerializedDataExists());
344 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
346 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
348 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
350 assertEquals("Exists response", true, exists);
352 InOrder inOrder = Mockito.inOrder(mockActorContext);
353 inOrder.verify(mockActorContext).executeOperationAsync(
354 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
356 inOrder.verify(mockActorContext).executeOperationAsync(
357 eq(actorSelection(actorRef)), eqSerializedDataExists());
360 @Test(expected=IllegalStateException.class)
361 public void testExistsPreConditionCheck() {
362 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
363 transactionProxy.exists(TestModel.TEST_PATH);
366 private void verifyRecordingOperationFutures(List<Future<Object>> futures,
367 Class<?>... expResultTypes) throws Exception {
368 assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
371 for( Future<Object> future: futures) {
372 assertNotNull("Recording operation Future is null", future);
374 Class<?> expResultType = expResultTypes[i++];
375 if(Throwable.class.isAssignableFrom(expResultType)) {
377 Await.result(future, Duration.create(5, TimeUnit.SECONDS));
378 fail("Expected exception from recording operation Future");
379 } catch(Exception e) {
383 assertEquals(String.format("Recording operation %d Future result type", i +1 ), expResultType,
384 Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
390 public void testWrite() throws Exception {
391 dataStoreContextBuilder.shardBatchedModificationCount(1);
392 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
394 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
396 expectBatchedModifications(actorRef, 1);
398 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
400 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
402 verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
406 public void testWriteAfterAsyncRead() throws Throwable {
407 ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem());
409 Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
410 doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
411 eq(getSystem().actorSelection(actorRef.path())),
412 eqCreateTransaction(memberName, READ_WRITE));
414 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
415 eq(actorSelection(actorRef)), eqSerializedReadData());
417 expectBatchedModifications(actorRef, 1);
418 expectReadyTransaction(actorRef);
420 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
422 final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
424 final CountDownLatch readComplete = new CountDownLatch(1);
425 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
426 com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
427 new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
429 public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
431 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
432 } catch (Exception e) {
435 readComplete.countDown();
440 public void onFailure(Throwable t) {
442 readComplete.countDown();
446 createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
448 Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
450 if(caughtEx.get() != null) {
451 throw caughtEx.get();
454 // This sends the batched modification.
455 transactionProxy.ready();
457 verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
459 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
460 BatchedModificationsReply.class);
463 @Test(expected=IllegalStateException.class)
464 public void testWritePreConditionCheck() {
465 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
466 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
469 @Test(expected=IllegalStateException.class)
470 public void testWriteAfterReadyPreConditionCheck() {
471 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
473 transactionProxy.ready();
475 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
479 public void testMerge() throws Exception {
480 dataStoreContextBuilder.shardBatchedModificationCount(1);
481 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
483 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
485 expectBatchedModifications(actorRef, 1);
487 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
489 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
491 verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
495 public void testDelete() throws Exception {
496 dataStoreContextBuilder.shardBatchedModificationCount(1);
497 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
499 expectBatchedModifications(actorRef, 1);
501 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
503 transactionProxy.delete(TestModel.TEST_PATH);
505 verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
509 public void testReadyWithReadWrite() throws Exception {
510 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
512 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
514 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
515 eq(actorSelection(actorRef)), eqSerializedReadData());
517 expectBatchedModifications(actorRef, 1);
518 expectReadyTransaction(actorRef);
520 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
522 transactionProxy.read(TestModel.TEST_PATH);
524 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
526 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
528 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
530 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
532 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
533 BatchedModificationsReply.class);
535 verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
537 verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
538 isA(BatchedModifications.class));
540 verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
541 isA(ReadyTransaction.SERIALIZABLE_CLASS));
545 public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
546 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
548 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
550 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
552 expectBatchedModificationsReady(actorRef, 1);
554 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
556 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
558 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
560 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
562 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
564 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures());
566 verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
568 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
569 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
571 verifyBatchedModifications(batchedModifications.get(0), true,
572 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
574 verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
575 isA(ReadyTransaction.SERIALIZABLE_CLASS));
579 public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
580 dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
581 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
583 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
585 expectBatchedModificationsReady(actorRef, 1);
587 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
589 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
591 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
593 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
595 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
597 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
598 BatchedModificationsReply.class);
600 verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
602 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
603 assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
605 verifyBatchedModifications(batchedModifications.get(0), false,
606 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
608 verifyBatchedModifications(batchedModifications.get(1), true);
610 verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
611 isA(ReadyTransaction.SERIALIZABLE_CLASS));
615 public void testReadyWithRecordingOperationFailure() throws Exception {
616 dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
618 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
620 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
622 expectFailedBatchedModifications(actorRef);
624 doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
626 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
628 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
630 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
632 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
634 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
636 verifyCohortFutures(proxy, TestException.class);
638 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), TestException.class);
642 public void testReadyWithReplyFailure() throws Exception {
643 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
645 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
647 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
649 expectFailedBatchedModifications(actorRef);
651 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
653 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
655 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
657 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
659 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
661 verifyCohortFutures(proxy, TestException.class);
664 private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
665 doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
667 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
669 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
671 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
673 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
675 transactionProxy.delete(TestModel.TEST_PATH);
677 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
679 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
681 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
683 verifyCohortFutures(proxy, toThrow.getClass());
687 public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
688 testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
692 public void testWriteOnlyTxWithNotInitializedException() throws Exception {
693 testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
697 public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
698 testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
702 public void testReadyWithInvalidReplyMessageType() throws Exception {
703 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
704 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
706 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
708 //expectBatchedModifications(actorRef, 1);
710 doReturn(Futures.successful(new Object())).when(mockActorContext).
711 executeOperationAsync(eq(actorSelection(actorRef)),
712 isA(BatchedModifications.class));
714 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
716 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
718 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
720 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
722 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
724 verifyCohortFutures(proxy, IllegalArgumentException.class);
728 public void testGetIdentifier() {
729 setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
730 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
731 TransactionProxy.TransactionType.READ_ONLY);
733 Object id = transactionProxy.getIdentifier();
734 assertNotNull("getIdentifier returned null", id);
735 assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
739 public void testClose() throws Exception{
740 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
742 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
743 eq(actorSelection(actorRef)), eqSerializedReadData());
745 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
747 transactionProxy.read(TestModel.TEST_PATH);
749 transactionProxy.close();
751 verify(mockActorContext).sendOperationAsync(
752 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
757 * Method to test a local Tx actor. The Tx paths are matched to decide if the
758 * Tx actor is local or not. This is done by mocking the Tx actor path
759 * and the caller paths and ensuring that the paths have the remote-address format
761 * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
762 * the paths returned for the actors for all the tests are not qualified remote paths.
763 * Hence are treated as non-local/remote actors. In short, all tests except
764 * few below run for remote actors
769 public void testLocalTxActorRead() throws Exception {
770 setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
771 doReturn(true).when(mockActorContext).isPathLocal(anyString());
773 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
775 // negative test case with null as the reply
776 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
777 any(ActorSelection.class), eqReadData());
779 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
780 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
782 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
784 // test case with node as read data reply
785 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
787 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
788 any(ActorSelection.class), eqReadData());
790 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
792 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
794 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
796 // test for local data exists
797 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
798 any(ActorSelection.class), eqDataExists());
800 boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
802 assertEquals("Exists response", true, exists);
806 public void testLocalTxActorReady() throws Exception {
807 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
808 doReturn(true).when(mockActorContext).isPathLocal(anyString());
810 doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
811 any(ActorSelection.class), isA(BatchedModifications.class));
813 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
815 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
816 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
819 doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
820 eq(actorSelection(actorRef)), isA(ReadyTransaction.class));
822 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
824 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
826 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
828 verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
831 private static interface TransactionProxyOperation {
832 void run(TransactionProxy transactionProxy);
835 private void throttleOperation(TransactionProxyOperation operation) {
836 throttleOperation(operation, 1, true);
839 private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
840 ActorSystem actorSystem = getSystem();
841 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
843 doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
845 doReturn(actorSystem.actorSelection(shardActorRef.path())).
846 when(mockActorContext).actorSelection(shardActorRef.path().toString());
849 doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
850 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
852 doReturn(Futures.failed(new Exception("not found")))
853 .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
856 String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
857 CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
858 setTransactionId("txn-1").setTransactionActorPath(actorPath).
859 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
861 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
862 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
863 eqCreateTransaction(memberName, READ_WRITE));
865 doReturn(true).when(mockActorContext).isPathLocal(actorPath);
867 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
869 long start = System.nanoTime();
871 operation.run(transactionProxy);
873 long end = System.nanoTime();
875 long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
876 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
877 expected, (end-start)), (end - start) > expected);
881 private void completeOperation(TransactionProxyOperation operation){
882 completeOperation(operation, true);
885 private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
886 ActorSystem actorSystem = getSystem();
887 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
889 doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
891 doReturn(actorSystem.actorSelection(shardActorRef.path())).
892 when(mockActorContext).actorSelection(shardActorRef.path().toString());
895 doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
896 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
898 doReturn(Futures.failed(new PrimaryNotFoundException("test")))
899 .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
902 ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
903 String actorPath = txActorRef.path().toString();
904 CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
905 setTransactionId("txn-1").setTransactionActorPath(actorPath).
906 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
908 doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
910 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
911 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
912 eqCreateTransaction(memberName, READ_WRITE));
914 doReturn(true).when(mockActorContext).isPathLocal(anyString());
916 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
918 long start = System.nanoTime();
920 operation.run(transactionProxy);
922 long end = System.nanoTime();
924 long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
925 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
926 expected, (end-start)), (end - start) <= expected);
929 public void testWriteThrottling(boolean shardFound){
931 throttleOperation(new TransactionProxyOperation() {
933 public void run(TransactionProxy transactionProxy) {
934 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
936 expectBatchedModifications(2);
938 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
940 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
946 public void testWriteThrottlingWhenShardFound(){
947 dataStoreContextBuilder.shardBatchedModificationCount(1);
948 throttleOperation(new TransactionProxyOperation() {
950 public void run(TransactionProxy transactionProxy) {
951 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
953 expectIncompleteBatchedModifications();
955 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
957 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
963 public void testWriteThrottlingWhenShardNotFound(){
964 // Confirm that there is no throttling when the Shard is not found
965 dataStoreContextBuilder.shardBatchedModificationCount(1);
966 completeOperation(new TransactionProxyOperation() {
968 public void run(TransactionProxy transactionProxy) {
969 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
971 expectBatchedModifications(2);
973 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
975 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
983 public void testWriteCompletion(){
984 dataStoreContextBuilder.shardBatchedModificationCount(1);
985 completeOperation(new TransactionProxyOperation() {
987 public void run(TransactionProxy transactionProxy) {
988 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
990 expectBatchedModifications(2);
992 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
994 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1000 public void testMergeThrottlingWhenShardFound(){
1001 dataStoreContextBuilder.shardBatchedModificationCount(1);
1002 throttleOperation(new TransactionProxyOperation() {
1004 public void run(TransactionProxy transactionProxy) {
1005 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1007 expectIncompleteBatchedModifications();
1009 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1011 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1017 public void testMergeThrottlingWhenShardNotFound(){
1018 dataStoreContextBuilder.shardBatchedModificationCount(1);
1019 completeOperation(new TransactionProxyOperation() {
1021 public void run(TransactionProxy transactionProxy) {
1022 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1024 expectBatchedModifications(2);
1026 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1028 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1034 public void testMergeCompletion(){
1035 dataStoreContextBuilder.shardBatchedModificationCount(1);
1036 completeOperation(new TransactionProxyOperation() {
1038 public void run(TransactionProxy transactionProxy) {
1039 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1041 expectBatchedModifications(2);
1043 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1045 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1052 public void testDeleteThrottlingWhenShardFound(){
1054 throttleOperation(new TransactionProxyOperation() {
1056 public void run(TransactionProxy transactionProxy) {
1057 expectIncompleteBatchedModifications();
1059 transactionProxy.delete(TestModel.TEST_PATH);
1061 transactionProxy.delete(TestModel.TEST_PATH);
1068 public void testDeleteThrottlingWhenShardNotFound(){
1070 completeOperation(new TransactionProxyOperation() {
1072 public void run(TransactionProxy transactionProxy) {
1073 expectBatchedModifications(2);
1075 transactionProxy.delete(TestModel.TEST_PATH);
1077 transactionProxy.delete(TestModel.TEST_PATH);
1083 public void testDeleteCompletion(){
1084 dataStoreContextBuilder.shardBatchedModificationCount(1);
1085 completeOperation(new TransactionProxyOperation() {
1087 public void run(TransactionProxy transactionProxy) {
1088 expectBatchedModifications(2);
1090 transactionProxy.delete(TestModel.TEST_PATH);
1092 transactionProxy.delete(TestModel.TEST_PATH);
1099 public void testReadThrottlingWhenShardFound(){
1101 throttleOperation(new TransactionProxyOperation() {
1103 public void run(TransactionProxy transactionProxy) {
1104 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1105 any(ActorSelection.class), eqReadData());
1107 transactionProxy.read(TestModel.TEST_PATH);
1109 transactionProxy.read(TestModel.TEST_PATH);
1115 public void testReadThrottlingWhenShardNotFound(){
1117 completeOperation(new TransactionProxyOperation() {
1119 public void run(TransactionProxy transactionProxy) {
1120 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1121 any(ActorSelection.class), eqReadData());
1123 transactionProxy.read(TestModel.TEST_PATH);
1125 transactionProxy.read(TestModel.TEST_PATH);
1132 public void testReadCompletion(){
1133 completeOperation(new TransactionProxyOperation() {
1135 public void run(TransactionProxy transactionProxy) {
1136 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1138 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1139 any(ActorSelection.class), eqReadData());
1141 transactionProxy.read(TestModel.TEST_PATH);
1143 transactionProxy.read(TestModel.TEST_PATH);
1150 public void testExistsThrottlingWhenShardFound(){
1152 throttleOperation(new TransactionProxyOperation() {
1154 public void run(TransactionProxy transactionProxy) {
1155 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1156 any(ActorSelection.class), eqDataExists());
1158 transactionProxy.exists(TestModel.TEST_PATH);
1160 transactionProxy.exists(TestModel.TEST_PATH);
1166 public void testExistsThrottlingWhenShardNotFound(){
1168 completeOperation(new TransactionProxyOperation() {
1170 public void run(TransactionProxy transactionProxy) {
1171 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1172 any(ActorSelection.class), eqDataExists());
1174 transactionProxy.exists(TestModel.TEST_PATH);
1176 transactionProxy.exists(TestModel.TEST_PATH);
1183 public void testExistsCompletion(){
1184 completeOperation(new TransactionProxyOperation() {
1186 public void run(TransactionProxy transactionProxy) {
1187 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1188 any(ActorSelection.class), eqDataExists());
1190 transactionProxy.exists(TestModel.TEST_PATH);
1192 transactionProxy.exists(TestModel.TEST_PATH);
1199 public void testReadyThrottling(){
1201 throttleOperation(new TransactionProxyOperation() {
1203 public void run(TransactionProxy transactionProxy) {
1204 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1206 expectBatchedModifications(1);
1208 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1209 any(ActorSelection.class), any(ReadyTransaction.class));
1211 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1213 transactionProxy.ready();
1219 public void testReadyThrottlingWithTwoTransactionContexts(){
1221 throttleOperation(new TransactionProxyOperation() {
1223 public void run(TransactionProxy transactionProxy) {
1224 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1225 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1227 expectBatchedModifications(2);
1229 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1230 any(ActorSelection.class), any(ReadyTransaction.class));
1232 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1234 transactionProxy.write(TestModel.TEST_PATH, carsNode);
1236 transactionProxy.ready();
1241 private void testModificationOperationBatching(TransactionType type) throws Exception {
1242 int shardBatchedModificationCount = 3;
1243 dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1245 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1247 expectBatchedModifications(actorRef, shardBatchedModificationCount);
1249 expectReadyTransaction(actorRef);
1251 YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1252 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1254 YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1255 NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1257 YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1258 NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1260 YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1261 NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1263 YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1264 NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1266 YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1267 NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1269 YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1270 YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1272 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type);
1274 transactionProxy.write(writePath1, writeNode1);
1275 transactionProxy.write(writePath2, writeNode2);
1276 transactionProxy.delete(deletePath1);
1277 transactionProxy.merge(mergePath1, mergeNode1);
1278 transactionProxy.merge(mergePath2, mergeNode2);
1279 transactionProxy.write(writePath3, writeNode3);
1280 transactionProxy.merge(mergePath3, mergeNode3);
1281 transactionProxy.delete(deletePath2);
1283 // This sends the last batch.
1284 transactionProxy.ready();
1286 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1287 assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1289 verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1290 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1292 verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1293 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1295 boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled();
1296 verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3),
1297 new DeleteModification(deletePath2));
1299 if(optimizedWriteOnly) {
1300 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1301 BatchedModificationsReply.class, BatchedModificationsReply.class);
1303 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1304 BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
1309 public void testReadWriteModificationOperationBatching() throws Throwable {
1310 testModificationOperationBatching(READ_WRITE);
1314 public void testWriteOnlyModificationOperationBatching() throws Throwable {
1315 testModificationOperationBatching(WRITE_ONLY);
1319 public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1320 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1321 testModificationOperationBatching(WRITE_ONLY);
1325 public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1327 int shardBatchedModificationCount = 10;
1328 dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1330 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1332 expectBatchedModifications(actorRef, shardBatchedModificationCount);
1334 YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1335 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1337 YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1338 NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1340 YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1341 NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1343 YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1344 NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1346 YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1348 doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1349 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1351 doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1352 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1354 doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
1355 eq(actorSelection(actorRef)), eqSerializedDataExists());
1357 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1359 transactionProxy.write(writePath1, writeNode1);
1360 transactionProxy.write(writePath2, writeNode2);
1362 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1363 get(5, TimeUnit.SECONDS);
1365 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1366 assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1368 transactionProxy.merge(mergePath1, mergeNode1);
1369 transactionProxy.merge(mergePath2, mergeNode2);
1371 readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1373 transactionProxy.delete(deletePath);
1375 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1376 assertEquals("Exists response", true, exists);
1378 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1379 assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1381 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1382 assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1384 verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1385 new WriteModification(writePath2, writeNode2));
1387 verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1388 new MergeModification(mergePath2, mergeNode2));
1390 verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1392 InOrder inOrder = Mockito.inOrder(mockActorContext);
1393 inOrder.verify(mockActorContext).executeOperationAsync(
1394 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1396 inOrder.verify(mockActorContext).executeOperationAsync(
1397 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1399 inOrder.verify(mockActorContext).executeOperationAsync(
1400 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1402 inOrder.verify(mockActorContext).executeOperationAsync(
1403 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1405 inOrder.verify(mockActorContext).executeOperationAsync(
1406 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1408 inOrder.verify(mockActorContext).executeOperationAsync(
1409 eq(actorSelection(actorRef)), eqSerializedDataExists());
1411 verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
1412 BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);