1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Matchers.any;
7 import static org.mockito.Matchers.anyString;
8 import static org.mockito.Matchers.eq;
9 import static org.mockito.Matchers.isA;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.never;
13 import static org.mockito.Mockito.verify;
14 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
15 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
16 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
17 import akka.actor.ActorRef;
18 import akka.actor.ActorSelection;
19 import akka.actor.ActorSystem;
20 import akka.actor.Props;
21 import akka.dispatch.Futures;
22 import com.google.common.base.Optional;
23 import com.google.common.collect.Sets;
24 import com.google.common.util.concurrent.CheckedFuture;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import java.util.Collection;
28 import java.util.List;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33 import org.junit.Assert;
34 import org.junit.Test;
35 import org.mockito.InOrder;
36 import org.mockito.Mockito;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
41 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
42 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
43 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
45 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
46 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
47 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
48 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
49 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
50 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
51 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
52 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
53 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
54 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
55 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
56 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
57 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
58 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
59 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
60 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
61 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
62 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
63 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
64 import scala.concurrent.Promise;
66 @SuppressWarnings("resource")
67 public class TransactionProxyTest extends AbstractTransactionProxyTest {
69 @SuppressWarnings("serial")
70 static class TestException extends RuntimeException {
73 static interface Invoker {
74 CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
78 public void testRead() throws Exception {
79 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
81 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
83 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
84 eq(actorSelection(actorRef)), eqSerializedReadData());
86 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
87 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
89 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
91 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
93 doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
94 eq(actorSelection(actorRef)), eqSerializedReadData());
96 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
98 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
100 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
103 @Test(expected = ReadFailedException.class)
104 public void testReadWithInvalidReplyMessageType() throws Exception {
105 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
107 doReturn(Futures.successful(new Object())).when(mockActorContext).
108 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
110 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
112 transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
115 @Test(expected = TestException.class)
116 public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
117 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
119 doReturn(Futures.failed(new TestException())).when(mockActorContext).
120 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
122 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
124 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
127 private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
129 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
131 if (exToThrow instanceof PrimaryNotFoundException) {
132 doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
134 doReturn(primaryShardInfoReply(getSystem(), actorRef)).
135 when(mockActorContext).findPrimaryShardAsync(anyString());
138 doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
139 any(ActorSelection.class), any());
141 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
143 propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
146 private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
147 testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
149 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
150 return proxy.read(TestModel.TEST_PATH);
155 @Test(expected = PrimaryNotFoundException.class)
156 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
157 testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
160 @Test(expected = TimeoutException.class)
161 public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
162 testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
163 new Exception("reason")));
166 @Test(expected = TestException.class)
167 public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
168 testReadWithExceptionOnInitialCreateTransaction(new TestException());
172 public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
173 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
175 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
177 expectBatchedModifications(actorRef, 1);
179 doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
180 eq(actorSelection(actorRef)), eqSerializedReadData());
182 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
184 transactionProxy.write(TestModel.TEST_PATH, expectedNode);
186 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
187 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
189 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
190 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
192 InOrder inOrder = Mockito.inOrder(mockActorContext);
193 inOrder.verify(mockActorContext).executeOperationAsync(
194 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
196 inOrder.verify(mockActorContext).executeOperationAsync(
197 eq(actorSelection(actorRef)), eqSerializedReadData());
200 @Test(expected=IllegalStateException.class)
201 public void testReadPreConditionCheck() {
202 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
203 transactionProxy.read(TestModel.TEST_PATH);
206 @Test(expected=IllegalArgumentException.class)
207 public void testInvalidCreateTransactionReply() throws Throwable {
208 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
210 doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
211 actorSelection(actorRef.path().toString());
213 doReturn(primaryShardInfoReply(getSystem(), actorRef)).
214 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
216 doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
217 eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
219 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
221 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
225 public void testExists() throws Exception {
226 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
228 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
230 doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
231 eq(actorSelection(actorRef)), eqSerializedDataExists());
233 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
235 assertEquals("Exists response", false, exists);
237 doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
238 eq(actorSelection(actorRef)), eqSerializedDataExists());
240 exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
242 assertEquals("Exists response", true, exists);
245 @Test(expected = PrimaryNotFoundException.class)
246 public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
247 testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
249 public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
250 return proxy.exists(TestModel.TEST_PATH);
255 @Test(expected = ReadFailedException.class)
256 public void testExistsWithInvalidReplyMessageType() throws Exception {
257 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
259 doReturn(Futures.successful(new Object())).when(mockActorContext).
260 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
262 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
265 transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
268 @Test(expected = TestException.class)
269 public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
270 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
272 doReturn(Futures.failed(new TestException())).when(mockActorContext).
273 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
275 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
277 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
281 public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
282 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
284 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
286 expectBatchedModifications(actorRef, 1);
288 doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
289 eq(actorSelection(actorRef)), eqSerializedDataExists());
291 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
293 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
295 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
297 assertEquals("Exists response", true, exists);
299 InOrder inOrder = Mockito.inOrder(mockActorContext);
300 inOrder.verify(mockActorContext).executeOperationAsync(
301 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
303 inOrder.verify(mockActorContext).executeOperationAsync(
304 eq(actorSelection(actorRef)), eqSerializedDataExists());
307 @Test(expected=IllegalStateException.class)
308 public void testExistsPreConditionCheck() {
309 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
310 transactionProxy.exists(TestModel.TEST_PATH);
314 public void testWrite() throws Exception {
315 dataStoreContextBuilder.shardBatchedModificationCount(1);
316 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
318 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
320 expectBatchedModifications(actorRef, 1);
322 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
324 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
326 verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
330 public void testWriteAfterAsyncRead() throws Throwable {
331 ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(), DefaultShardStrategy.DEFAULT_SHARD);
333 Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
334 doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
335 eq(getSystem().actorSelection(actorRef.path())),
336 eqCreateTransaction(memberName, READ_WRITE));
338 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
339 eq(actorSelection(actorRef)), eqSerializedReadData());
341 expectBatchedModificationsReady(actorRef);
343 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
345 final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
347 final CountDownLatch readComplete = new CountDownLatch(1);
348 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
349 com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
350 new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
352 public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
354 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
355 } catch (Exception e) {
358 readComplete.countDown();
363 public void onFailure(Throwable t) {
365 readComplete.countDown();
369 createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
371 Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
373 if(caughtEx.get() != null) {
374 throw caughtEx.get();
377 // This sends the batched modification.
378 transactionProxy.ready();
380 verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
383 @Test(expected=IllegalStateException.class)
384 public void testWritePreConditionCheck() {
385 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
386 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
389 @Test(expected=IllegalStateException.class)
390 public void testWriteAfterReadyPreConditionCheck() {
391 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
393 transactionProxy.ready();
395 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
399 public void testMerge() throws Exception {
400 dataStoreContextBuilder.shardBatchedModificationCount(1);
401 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
403 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
405 expectBatchedModifications(actorRef, 1);
407 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
409 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
411 verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
415 public void testDelete() throws Exception {
416 dataStoreContextBuilder.shardBatchedModificationCount(1);
417 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
419 expectBatchedModifications(actorRef, 1);
421 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
423 transactionProxy.delete(TestModel.TEST_PATH);
425 verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
429 public void testReadWrite() throws Exception {
430 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
432 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
434 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
435 eq(actorSelection(actorRef)), eqSerializedReadData());
437 expectBatchedModifications(actorRef, 1);
439 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
441 transactionProxy.read(TestModel.TEST_PATH);
443 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
445 transactionProxy.read(TestModel.TEST_PATH);
447 transactionProxy.read(TestModel.TEST_PATH);
449 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
450 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
452 verifyBatchedModifications(batchedModifications.get(0), false,
453 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
457 public void testReadyWithReadWrite() throws Exception {
458 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
460 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
462 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
463 eq(actorSelection(actorRef)), eqSerializedReadData());
465 expectBatchedModificationsReady(actorRef, true);
467 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
469 transactionProxy.read(TestModel.TEST_PATH);
471 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
473 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
475 assertTrue(ready instanceof SingleCommitCohortProxy);
477 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
479 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
480 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
482 verifyBatchedModifications(batchedModifications.get(0), true, true,
483 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
485 assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
489 public void testReadyWithNoModifications() throws Exception {
490 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
492 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
493 eq(actorSelection(actorRef)), eqSerializedReadData());
495 expectBatchedModificationsReady(actorRef, true);
497 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
499 transactionProxy.read(TestModel.TEST_PATH);
501 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
503 assertTrue(ready instanceof SingleCommitCohortProxy);
505 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
507 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
508 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
510 verifyBatchedModifications(batchedModifications.get(0), true, true);
514 public void testReadyWithMultipleShardWrites() throws Exception {
515 ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
517 ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
519 expectBatchedModificationsReady(actorRef1);
520 expectBatchedModificationsReady(actorRef2);
522 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
524 transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
525 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
527 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
529 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
531 verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
532 actorSelection(actorRef2));
536 public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
537 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
539 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
541 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
543 expectBatchedModificationsReady(actorRef, true);
545 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
547 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
549 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
551 assertTrue(ready instanceof SingleCommitCohortProxy);
553 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
555 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
556 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
558 verifyBatchedModifications(batchedModifications.get(0), true, true,
559 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
561 verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
562 isA(ReadyTransaction.SERIALIZABLE_CLASS));
566 public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
567 dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
568 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
570 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
572 expectBatchedModificationsReady(actorRef, true);
574 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
576 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
578 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
580 assertTrue(ready instanceof SingleCommitCohortProxy);
582 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
584 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
585 assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
587 verifyBatchedModifications(batchedModifications.get(0), false,
588 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
590 verifyBatchedModifications(batchedModifications.get(1), true, true);
592 verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
593 isA(ReadyTransaction.SERIALIZABLE_CLASS));
597 public void testReadyWithReplyFailure() throws Exception {
598 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
600 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
602 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
604 expectFailedBatchedModifications(actorRef);
606 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
608 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
610 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
612 assertTrue(ready instanceof SingleCommitCohortProxy);
614 verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
617 private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
618 doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
620 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
622 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
624 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
626 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
628 transactionProxy.delete(TestModel.TEST_PATH);
630 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
632 assertTrue(ready instanceof SingleCommitCohortProxy);
634 verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
638 public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
639 testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
643 public void testWriteOnlyTxWithNotInitializedException() throws Exception {
644 testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
648 public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
649 testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
653 public void testReadyWithInvalidReplyMessageType() throws Exception {
654 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
655 ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
657 ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
659 doReturn(Futures.successful(new Object())).when(mockActorContext).
660 executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class));
662 expectBatchedModificationsReady(actorRef2);
664 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
666 transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
667 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
669 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
671 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
673 verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
674 IllegalArgumentException.class);
678 public void testGetIdentifier() {
679 setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
680 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
681 TransactionType.READ_ONLY);
683 Object id = transactionProxy.getIdentifier();
684 assertNotNull("getIdentifier returned null", id);
685 assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
689 public void testClose() throws Exception{
690 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
692 doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
693 eq(actorSelection(actorRef)), eqSerializedReadData());
695 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
697 transactionProxy.read(TestModel.TEST_PATH);
699 transactionProxy.close();
701 verify(mockActorContext).sendOperationAsync(
702 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
707 * Method to test a local Tx actor. The Tx paths are matched to decide if the
708 * Tx actor is local or not. This is done by mocking the Tx actor path
709 * and the caller paths and ensuring that the paths have the remote-address format
711 * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
712 * the paths returned for the actors for all the tests are not qualified remote paths.
713 * Hence are treated as non-local/remote actors. In short, all tests except
714 * few below run for remote actors
719 public void testLocalTxActorRead() throws Exception {
720 setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
721 doReturn(true).when(mockActorContext).isPathLocal(anyString());
723 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
725 // negative test case with null as the reply
726 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
727 any(ActorSelection.class), eqReadData());
729 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
730 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
732 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
734 // test case with node as read data reply
735 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
737 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
738 any(ActorSelection.class), eqReadData());
740 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
742 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
744 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
746 // test for local data exists
747 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
748 any(ActorSelection.class), eqDataExists());
750 boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
752 assertEquals("Exists response", true, exists);
756 public void testLocalTxActorReady() throws Exception {
757 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
758 doReturn(true).when(mockActorContext).isPathLocal(anyString());
760 expectBatchedModificationsReady(actorRef, true);
762 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
764 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
765 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
767 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
769 assertTrue(ready instanceof SingleCommitCohortProxy);
771 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
774 private static interface TransactionProxyOperation {
775 void run(TransactionProxy transactionProxy);
778 private void throttleOperation(TransactionProxyOperation operation) {
779 throttleOperation(operation, 1, true);
782 private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
783 return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), Optional.<DataTree>absent());
786 private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
787 ActorSystem actorSystem = getSystem();
788 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
790 doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
792 doReturn(actorSystem.actorSelection(shardActorRef.path())).
793 when(mockActorContext).actorSelection(shardActorRef.path().toString());
796 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
797 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
799 doReturn(Futures.failed(new Exception("not found")))
800 .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
803 String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
804 CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
805 setTransactionId("txn-1").setTransactionActorPath(actorPath).
806 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
808 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
809 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
810 eqCreateTransaction(memberName, READ_WRITE));
812 doReturn(true).when(mockActorContext).isPathLocal(actorPath);
814 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
816 long start = System.nanoTime();
818 operation.run(transactionProxy);
820 long end = System.nanoTime();
822 long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
823 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
824 expected, (end-start)), (end - start) > expected);
828 private void completeOperation(TransactionProxyOperation operation){
829 completeOperation(operation, true);
832 private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
833 ActorSystem actorSystem = getSystem();
834 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
836 doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
838 doReturn(actorSystem.actorSelection(shardActorRef.path())).
839 when(mockActorContext).actorSelection(shardActorRef.path().toString());
842 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
843 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
845 doReturn(Futures.failed(new PrimaryNotFoundException("test")))
846 .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
849 ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
850 String actorPath = txActorRef.path().toString();
851 CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
852 setTransactionId("txn-1").setTransactionActorPath(actorPath).
853 setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
855 doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
857 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
858 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
859 eqCreateTransaction(memberName, READ_WRITE));
861 doReturn(true).when(mockActorContext).isPathLocal(anyString());
863 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
865 long start = System.nanoTime();
867 operation.run(transactionProxy);
869 long end = System.nanoTime();
871 long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
872 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
873 expected, (end-start)), (end - start) <= expected);
877 public void testWriteThrottlingWhenShardFound(){
878 dataStoreContextBuilder.shardBatchedModificationCount(1);
879 throttleOperation(new TransactionProxyOperation() {
881 public void run(TransactionProxy transactionProxy) {
882 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
884 expectIncompleteBatchedModifications();
886 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
888 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
894 public void testWriteThrottlingWhenShardNotFound(){
895 // Confirm that there is no throttling when the Shard is not found
896 dataStoreContextBuilder.shardBatchedModificationCount(1);
897 completeOperation(new TransactionProxyOperation() {
899 public void run(TransactionProxy transactionProxy) {
900 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
902 expectBatchedModifications(2);
904 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
906 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
914 public void testWriteCompletion(){
915 dataStoreContextBuilder.shardBatchedModificationCount(1);
916 completeOperation(new TransactionProxyOperation() {
918 public void run(TransactionProxy transactionProxy) {
919 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
921 expectBatchedModifications(2);
923 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
925 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
931 public void testMergeThrottlingWhenShardFound(){
932 dataStoreContextBuilder.shardBatchedModificationCount(1);
933 throttleOperation(new TransactionProxyOperation() {
935 public void run(TransactionProxy transactionProxy) {
936 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
938 expectIncompleteBatchedModifications();
940 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
942 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
948 public void testMergeThrottlingWhenShardNotFound(){
949 dataStoreContextBuilder.shardBatchedModificationCount(1);
950 completeOperation(new TransactionProxyOperation() {
952 public void run(TransactionProxy transactionProxy) {
953 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
955 expectBatchedModifications(2);
957 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
959 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
965 public void testMergeCompletion(){
966 dataStoreContextBuilder.shardBatchedModificationCount(1);
967 completeOperation(new TransactionProxyOperation() {
969 public void run(TransactionProxy transactionProxy) {
970 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
972 expectBatchedModifications(2);
974 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
976 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
983 public void testDeleteThrottlingWhenShardFound(){
985 throttleOperation(new TransactionProxyOperation() {
987 public void run(TransactionProxy transactionProxy) {
988 expectIncompleteBatchedModifications();
990 transactionProxy.delete(TestModel.TEST_PATH);
992 transactionProxy.delete(TestModel.TEST_PATH);
999 public void testDeleteThrottlingWhenShardNotFound(){
1001 completeOperation(new TransactionProxyOperation() {
1003 public void run(TransactionProxy transactionProxy) {
1004 expectBatchedModifications(2);
1006 transactionProxy.delete(TestModel.TEST_PATH);
1008 transactionProxy.delete(TestModel.TEST_PATH);
1014 public void testDeleteCompletion(){
1015 dataStoreContextBuilder.shardBatchedModificationCount(1);
1016 completeOperation(new TransactionProxyOperation() {
1018 public void run(TransactionProxy transactionProxy) {
1019 expectBatchedModifications(2);
1021 transactionProxy.delete(TestModel.TEST_PATH);
1023 transactionProxy.delete(TestModel.TEST_PATH);
1030 public void testReadThrottlingWhenShardFound(){
1032 throttleOperation(new TransactionProxyOperation() {
1034 public void run(TransactionProxy transactionProxy) {
1035 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1036 any(ActorSelection.class), eqReadData());
1038 transactionProxy.read(TestModel.TEST_PATH);
1040 transactionProxy.read(TestModel.TEST_PATH);
1046 public void testReadThrottlingWhenShardNotFound(){
1048 completeOperation(new TransactionProxyOperation() {
1050 public void run(TransactionProxy transactionProxy) {
1051 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1052 any(ActorSelection.class), eqReadData());
1054 transactionProxy.read(TestModel.TEST_PATH);
1056 transactionProxy.read(TestModel.TEST_PATH);
1063 public void testReadCompletion(){
1064 completeOperation(new TransactionProxyOperation() {
1066 public void run(TransactionProxy transactionProxy) {
1067 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1069 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1070 any(ActorSelection.class), eqReadData());
1072 transactionProxy.read(TestModel.TEST_PATH);
1074 transactionProxy.read(TestModel.TEST_PATH);
1081 public void testExistsThrottlingWhenShardFound(){
1083 throttleOperation(new TransactionProxyOperation() {
1085 public void run(TransactionProxy transactionProxy) {
1086 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1087 any(ActorSelection.class), eqDataExists());
1089 transactionProxy.exists(TestModel.TEST_PATH);
1091 transactionProxy.exists(TestModel.TEST_PATH);
1097 public void testExistsThrottlingWhenShardNotFound(){
1099 completeOperation(new TransactionProxyOperation() {
1101 public void run(TransactionProxy transactionProxy) {
1102 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1103 any(ActorSelection.class), eqDataExists());
1105 transactionProxy.exists(TestModel.TEST_PATH);
1107 transactionProxy.exists(TestModel.TEST_PATH);
1114 public void testExistsCompletion(){
1115 completeOperation(new TransactionProxyOperation() {
1117 public void run(TransactionProxy transactionProxy) {
1118 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1119 any(ActorSelection.class), eqDataExists());
1121 transactionProxy.exists(TestModel.TEST_PATH);
1123 transactionProxy.exists(TestModel.TEST_PATH);
1130 public void testReadyThrottling(){
1132 throttleOperation(new TransactionProxyOperation() {
1134 public void run(TransactionProxy transactionProxy) {
1135 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1137 expectBatchedModifications(1);
1139 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1140 any(ActorSelection.class), any(ReadyTransaction.class));
1142 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1144 transactionProxy.ready();
1150 public void testReadyThrottlingWithTwoTransactionContexts(){
1152 throttleOperation(new TransactionProxyOperation() {
1154 public void run(TransactionProxy transactionProxy) {
1155 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1156 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1158 expectBatchedModifications(2);
1160 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1161 any(ActorSelection.class), any(ReadyTransaction.class));
1163 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1165 transactionProxy.write(TestModel.TEST_PATH, carsNode);
1167 transactionProxy.ready();
1172 private void testModificationOperationBatching(TransactionType type) throws Exception {
1173 int shardBatchedModificationCount = 3;
1174 dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1176 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1178 expectBatchedModifications(actorRef, shardBatchedModificationCount);
1180 YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1181 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1183 YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1184 NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1186 YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1187 NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1189 YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1190 NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1192 YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1193 NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1195 YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1196 NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1198 YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1199 YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1201 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type);
1203 transactionProxy.write(writePath1, writeNode1);
1204 transactionProxy.write(writePath2, writeNode2);
1205 transactionProxy.delete(deletePath1);
1206 transactionProxy.merge(mergePath1, mergeNode1);
1207 transactionProxy.merge(mergePath2, mergeNode2);
1208 transactionProxy.write(writePath3, writeNode3);
1209 transactionProxy.merge(mergePath3, mergeNode3);
1210 transactionProxy.delete(deletePath2);
1212 // This sends the last batch.
1213 transactionProxy.ready();
1215 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1216 assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1218 verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1219 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1221 verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1222 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1224 verifyBatchedModifications(batchedModifications.get(2), true, true,
1225 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1227 assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1231 public void testReadWriteModificationOperationBatching() throws Throwable {
1232 testModificationOperationBatching(READ_WRITE);
1236 public void testWriteOnlyModificationOperationBatching() throws Throwable {
1237 testModificationOperationBatching(WRITE_ONLY);
1241 public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
1242 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1243 testModificationOperationBatching(WRITE_ONLY);
1247 public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
1249 int shardBatchedModificationCount = 10;
1250 dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1252 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1254 expectBatchedModifications(actorRef, shardBatchedModificationCount);
1256 YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1257 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1259 YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1260 NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1262 YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1263 NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1265 YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1266 NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1268 YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1270 doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1271 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1273 doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1274 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1276 doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
1277 eq(actorSelection(actorRef)), eqSerializedDataExists());
1279 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
1281 transactionProxy.write(writePath1, writeNode1);
1282 transactionProxy.write(writePath2, writeNode2);
1284 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
1285 get(5, TimeUnit.SECONDS);
1287 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1288 assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1290 transactionProxy.merge(mergePath1, mergeNode1);
1291 transactionProxy.merge(mergePath2, mergeNode2);
1293 readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1295 transactionProxy.delete(deletePath);
1297 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1298 assertEquals("Exists response", true, exists);
1300 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1301 assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1303 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1304 assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1306 verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1307 new WriteModification(writePath2, writeNode2));
1309 verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1310 new MergeModification(mergePath2, mergeNode2));
1312 verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1314 InOrder inOrder = Mockito.inOrder(mockActorContext);
1315 inOrder.verify(mockActorContext).executeOperationAsync(
1316 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1318 inOrder.verify(mockActorContext).executeOperationAsync(
1319 eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
1321 inOrder.verify(mockActorContext).executeOperationAsync(
1322 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1324 inOrder.verify(mockActorContext).executeOperationAsync(
1325 eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
1327 inOrder.verify(mockActorContext).executeOperationAsync(
1328 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
1330 inOrder.verify(mockActorContext).executeOperationAsync(
1331 eq(actorSelection(actorRef)), eqSerializedDataExists());
1335 public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
1337 SchemaContext schemaContext = SchemaContextHelper.full();
1338 Configuration configuration = mock(Configuration.class);
1339 doReturn(configuration).when(mockActorContext).getConfiguration();
1340 doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1341 doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1343 NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1344 NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1346 setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1347 setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1349 doReturn(memberName).when(mockActorContext).getCurrentMemberName();
1351 doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
1353 doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1355 TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
1357 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1358 YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS);
1360 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1362 NormalizedNode<?, ?> normalizedNode = readOptional.get();
1364 assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1366 Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1368 for(NormalizedNode<?,?> node : collection){
1369 assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1372 assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1373 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1375 assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1377 assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1378 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1380 assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1384 private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
1385 ActorSystem actorSystem = getSystem();
1386 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1388 doReturn(getSystem().actorSelection(shardActorRef.path())).
1389 when(mockActorContext).actorSelection(shardActorRef.path().toString());
1391 doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
1392 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
1394 doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
1396 ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1398 doReturn(actorSystem.actorSelection(txActorRef.path())).
1399 when(mockActorContext).actorSelection(txActorRef.path().toString());
1401 doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
1402 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1403 eqCreateTransaction(memberName, TransactionType.READ_ONLY));
1405 doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1406 eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));