2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
26 import akka.actor.ActorRef;
27 import akka.actor.ActorSelection;
28 import akka.actor.ActorSystem;
29 import akka.actor.Props;
30 import akka.dispatch.Futures;
31 import akka.util.Timeout;
32 import com.google.common.base.Optional;
33 import com.google.common.base.Throwables;
34 import com.google.common.collect.Sets;
35 import com.google.common.util.concurrent.CheckedFuture;
36 import com.google.common.util.concurrent.FutureCallback;
37 import com.google.common.util.concurrent.Uninterruptibles;
38 import java.util.Collection;
39 import java.util.List;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicReference;
44 import org.junit.Assert;
45 import org.junit.Test;
46 import org.mockito.InOrder;
47 import org.mockito.Mockito;
48 import org.opendaylight.controller.cluster.access.concepts.MemberName;
49 import org.opendaylight.controller.cluster.datastore.config.Configuration;
50 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
51 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
52 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
53 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
54 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
55 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
59 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
60 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
61 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
62 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
63 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
64 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
65 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
66 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
67 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
68 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
69 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
70 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
71 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
72 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
73 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
74 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
75 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
76 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
77 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
78 import scala.concurrent.Promise;
80 @SuppressWarnings("resource")
81 public class TransactionProxyTest extends AbstractTransactionProxyTest {
83 @SuppressWarnings("serial")
84 static class TestException extends RuntimeException {
88 CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
92 public void testRead() throws Exception {
93 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
95 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
97 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
98 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
100 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
101 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
103 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
105 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
107 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
108 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
110 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
112 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
114 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
117 @Test(expected = ReadFailedException.class)
118 public void testReadWithInvalidReplyMessageType() throws Exception {
119 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
121 doReturn(Futures.successful(new Object())).when(mockActorContext)
122 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
124 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
126 transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
129 @Test(expected = TestException.class)
130 public void testReadWithAsyncRemoteOperatonFailure() throws Exception {
131 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
133 doReturn(Futures.failed(new TestException())).when(mockActorContext)
134 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
136 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
138 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
141 private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker) throws Exception {
142 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
144 if (exToThrow instanceof PrimaryNotFoundException) {
145 doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
147 doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
148 .findPrimaryShardAsync(anyString());
151 doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
152 any(ActorSelection.class), any(), any(Timeout.class));
154 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
156 propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
159 private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Exception {
160 testExceptionOnInitialCreateTransaction(exToThrow, proxy -> proxy.read(TestModel.TEST_PATH));
163 @Test(expected = PrimaryNotFoundException.class)
164 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
165 testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
168 @Test(expected = TimeoutException.class)
169 public void testReadWhenATimeoutExceptionIsThrown() throws Exception {
170 testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
171 new Exception("reason")));
174 @Test(expected = TestException.class)
175 public void testReadWhenAnyOtherExceptionIsThrown() throws Exception {
176 testReadWithExceptionOnInitialCreateTransaction(new TestException());
180 public void testReadWithPriorRecordingOperationSuccessful() throws Exception {
181 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
183 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
185 expectBatchedModifications(actorRef, 1);
187 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
188 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
190 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
192 transactionProxy.write(TestModel.TEST_PATH, expectedNode);
194 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
195 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
197 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
198 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
200 InOrder inOrder = Mockito.inOrder(mockActorContext);
201 inOrder.verify(mockActorContext).executeOperationAsync(
202 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
204 inOrder.verify(mockActorContext).executeOperationAsync(
205 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
208 @Test(expected = IllegalStateException.class)
209 public void testReadPreConditionCheck() {
210 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
211 transactionProxy.read(TestModel.TEST_PATH);
214 @Test(expected = IllegalArgumentException.class)
215 public void testInvalidCreateTransactionReply() throws Exception {
216 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
218 doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext)
219 .actorSelection(actorRef.path().toString());
221 doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
222 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
224 doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
225 eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
228 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
230 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
234 public void testExists() throws Exception {
235 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
237 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
239 doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
240 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
242 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
244 assertEquals("Exists response", false, exists);
246 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
247 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
249 exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
251 assertEquals("Exists response", true, exists);
254 @Test(expected = PrimaryNotFoundException.class)
255 public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
256 testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"),
257 proxy -> proxy.exists(TestModel.TEST_PATH));
260 @Test(expected = ReadFailedException.class)
261 public void testExistsWithInvalidReplyMessageType() throws Exception {
262 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
264 doReturn(Futures.successful(new Object())).when(mockActorContext)
265 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
267 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
269 transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
272 @Test(expected = TestException.class)
273 public void testExistsWithAsyncRemoteOperatonFailure() throws Exception {
274 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
276 doReturn(Futures.failed(new TestException())).when(mockActorContext)
277 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
279 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
281 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
285 public void testExistsWithPriorRecordingOperationSuccessful() throws Exception {
286 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
288 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
290 expectBatchedModifications(actorRef, 1);
292 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
293 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
295 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
297 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
299 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
301 assertEquals("Exists response", true, exists);
303 InOrder inOrder = Mockito.inOrder(mockActorContext);
304 inOrder.verify(mockActorContext).executeOperationAsync(
305 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
307 inOrder.verify(mockActorContext).executeOperationAsync(
308 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
311 @Test(expected = IllegalStateException.class)
312 public void testExistsPreConditionCheck() {
313 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
314 transactionProxy.exists(TestModel.TEST_PATH);
318 public void testWrite() throws Exception {
319 dataStoreContextBuilder.shardBatchedModificationCount(1);
320 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
322 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
324 expectBatchedModifications(actorRef, 1);
326 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
328 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
330 verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
334 @SuppressWarnings("checkstyle:IllegalCatch")
335 public void testWriteAfterAsyncRead() throws Exception {
336 ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(),
337 DefaultShardStrategy.DEFAULT_SHARD);
339 Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
340 doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
341 eq(getSystem().actorSelection(actorRef.path())),
342 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
344 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
345 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
347 expectBatchedModificationsReady(actorRef);
349 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
351 final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
353 final CountDownLatch readComplete = new CountDownLatch(1);
354 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
355 com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
356 new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
358 public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
360 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
361 } catch (Exception e) {
364 readComplete.countDown();
369 public void onFailure(Throwable failure) {
370 caughtEx.set(failure);
371 readComplete.countDown();
375 createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
377 Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
379 if (caughtEx.get() != null) {
380 Throwables.propagateIfInstanceOf(caughtEx.get(), Exception.class);
381 Throwables.propagate(caughtEx.get());
384 // This sends the batched modification.
385 transactionProxy.ready();
387 verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
390 @Test(expected = IllegalStateException.class)
391 public void testWritePreConditionCheck() {
392 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
393 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
396 @Test(expected = IllegalStateException.class)
397 public void testWriteAfterReadyPreConditionCheck() {
398 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
400 transactionProxy.ready();
402 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
406 public void testMerge() throws Exception {
407 dataStoreContextBuilder.shardBatchedModificationCount(1);
408 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
410 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
412 expectBatchedModifications(actorRef, 1);
414 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
416 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
418 verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
422 public void testDelete() throws Exception {
423 dataStoreContextBuilder.shardBatchedModificationCount(1);
424 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
426 expectBatchedModifications(actorRef, 1);
428 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
430 transactionProxy.delete(TestModel.TEST_PATH);
432 verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
436 public void testReadWrite() throws Exception {
437 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
439 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
441 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
442 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
444 expectBatchedModifications(actorRef, 1);
446 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
448 transactionProxy.read(TestModel.TEST_PATH);
450 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
452 transactionProxy.read(TestModel.TEST_PATH);
454 transactionProxy.read(TestModel.TEST_PATH);
456 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
457 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
459 verifyBatchedModifications(batchedModifications.get(0), false,
460 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
464 public void testReadyWithReadWrite() throws Exception {
465 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
467 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
469 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
470 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
472 expectBatchedModificationsReady(actorRef, true);
474 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
476 transactionProxy.read(TestModel.TEST_PATH);
478 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
480 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
482 assertTrue(ready instanceof SingleCommitCohortProxy);
484 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
486 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
487 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
489 verifyBatchedModifications(batchedModifications.get(0), true, true,
490 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
492 assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
496 public void testReadyWithNoModifications() throws Exception {
497 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
499 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
500 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
502 expectBatchedModificationsReady(actorRef, true);
504 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
506 transactionProxy.read(TestModel.TEST_PATH);
508 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
510 assertTrue(ready instanceof SingleCommitCohortProxy);
512 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
514 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
515 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
517 verifyBatchedModifications(batchedModifications.get(0), true, true);
521 public void testReadyWithMultipleShardWrites() throws Exception {
522 ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
524 ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
526 expectBatchedModificationsReady(actorRef1);
527 expectBatchedModificationsReady(actorRef2);
529 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
531 transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
532 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
534 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
536 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
538 verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
539 actorSelection(actorRef2));
543 public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
544 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
546 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
548 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
550 expectBatchedModificationsReady(actorRef, true);
552 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
554 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
556 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
558 assertTrue(ready instanceof SingleCommitCohortProxy);
560 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
562 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
563 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
565 verifyBatchedModifications(batchedModifications.get(0), true, true,
566 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
570 public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
571 dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
572 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
574 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
576 expectBatchedModificationsReady(actorRef, true);
578 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
580 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
582 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
584 assertTrue(ready instanceof SingleCommitCohortProxy);
586 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
588 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
589 assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
591 verifyBatchedModifications(batchedModifications.get(0), false,
592 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
594 verifyBatchedModifications(batchedModifications.get(1), true, true);
598 public void testReadyWithReplyFailure() throws Exception {
599 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
601 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
603 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
605 expectFailedBatchedModifications(actorRef);
607 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
609 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
611 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
613 assertTrue(ready instanceof SingleCommitCohortProxy);
615 verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
619 public void testReadyWithDebugContextEnabled() throws Exception {
620 dataStoreContextBuilder.transactionDebugContextEnabled(true);
622 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
624 expectBatchedModificationsReady(actorRef, true);
626 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
628 transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
630 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
632 assertTrue(ready instanceof DebugThreePhaseCommitCohort);
634 verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
638 public void testReadyWithLocalTransaction() throws Exception {
639 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
641 doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
642 .actorSelection(shardActorRef.path().toString());
644 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).when(mockActorContext)
645 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
647 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
649 expectReadyLocalTransaction(shardActorRef, true);
651 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
652 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
654 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
655 assertTrue(ready instanceof SingleCommitCohortProxy);
656 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
660 public void testReadyWithLocalTransactionWithFailure() throws Exception {
661 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
663 doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
664 .actorSelection(shardActorRef.path().toString());
666 DataTree mockDataTree = createDataTree();
667 DataTreeModification mockModification = mockDataTree.takeSnapshot().newModification();
668 doThrow(new RuntimeException("mock")).when(mockModification).ready();
670 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).when(mockActorContext)
671 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
673 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
675 expectReadyLocalTransaction(shardActorRef, true);
677 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
678 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
680 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
681 assertTrue(ready instanceof SingleCommitCohortProxy);
682 verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
685 private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
686 doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
688 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
690 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
692 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
694 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
696 transactionProxy.delete(TestModel.TEST_PATH);
698 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
700 assertTrue(ready instanceof SingleCommitCohortProxy);
702 verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
706 public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
707 testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
711 public void testWriteOnlyTxWithNotInitializedException() throws Exception {
712 testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
716 public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
717 testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
721 public void testReadyWithInvalidReplyMessageType() throws Exception {
722 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
723 ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
725 ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
727 doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
728 eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
730 expectBatchedModificationsReady(actorRef2);
732 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
734 transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
735 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
737 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
739 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
741 verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
742 IllegalArgumentException.class);
746 public void testGetIdentifier() {
747 setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
748 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
750 Object id = transactionProxy.getIdentifier();
751 assertNotNull("getIdentifier returned null", id);
752 assertTrue("Invalid identifier: " + id, id.toString().contains(memberName));
756 public void testClose() throws Exception {
757 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
759 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
760 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
762 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
764 transactionProxy.read(TestModel.TEST_PATH);
766 transactionProxy.close();
768 verify(mockActorContext).sendOperationAsync(
769 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
772 private interface TransactionProxyOperation {
773 void run(TransactionProxy transactionProxy);
776 private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef) {
777 return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
780 private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, DataTree dataTree) {
781 return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
785 private void throttleOperation(TransactionProxyOperation operation) {
786 throttleOperation(operation, 1, true);
789 private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound) {
790 throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
791 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
794 private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound,
795 long expectedCompletionTime) {
796 ActorSystem actorSystem = getSystem();
797 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
799 // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
800 // we now allow one extra permit to be allowed for ready
801 doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2)
802 .shardBatchedModificationCount(outstandingOpsLimit - 1).build()).when(mockActorContext)
803 .getDatastoreContext();
805 doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
806 .actorSelection(shardActorRef.path().toString());
809 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
810 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
811 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
812 .findPrimaryShardAsync(eq("cars"));
815 doReturn(Futures.failed(new Exception("not found")))
816 .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
819 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
820 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
823 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
825 long start = System.nanoTime();
827 operation.run(transactionProxy);
829 long end = System.nanoTime();
831 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
832 expectedCompletionTime, end - start),
833 end - start > expectedCompletionTime && end - start < expectedCompletionTime * 2);
837 private void completeOperation(TransactionProxyOperation operation) {
838 completeOperation(operation, true);
841 private void completeOperation(TransactionProxyOperation operation, boolean shardFound) {
842 ActorSystem actorSystem = getSystem();
843 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
845 doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
846 .actorSelection(shardActorRef.path().toString());
849 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
850 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
852 doReturn(Futures.failed(new PrimaryNotFoundException("test"))).when(mockActorContext)
853 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
856 ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
857 String actorPath = txActorRef.path().toString();
858 CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, nextTransactionId(),
859 DataStoreVersions.CURRENT_VERSION);
861 doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
863 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).executeOperationAsync(
864 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
867 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
869 long start = System.nanoTime();
871 operation.run(transactionProxy);
873 long end = System.nanoTime();
875 long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
876 .getOperationTimeoutInMillis());
877 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
878 expected, end - start), end - start <= expected);
881 private void completeOperationLocal(TransactionProxyOperation operation, DataTree dataTree) {
882 ActorSystem actorSystem = getSystem();
883 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
885 doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
886 .actorSelection(shardActorRef.path().toString());
888 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTree))).when(mockActorContext)
889 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
891 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
893 long start = System.nanoTime();
895 operation.run(transactionProxy);
897 long end = System.nanoTime();
899 long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
900 .getOperationTimeoutInMillis());
901 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", expected, end - start),
902 end - start <= expected);
905 private static DataTree createDataTree() {
906 DataTree dataTree = mock(DataTree.class);
907 DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
908 DataTreeModification dataTreeModification = mock(DataTreeModification.class);
910 doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
911 doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
916 private static DataTree createDataTree(NormalizedNode<?, ?> readResponse) {
917 DataTree dataTree = mock(DataTree.class);
918 DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
919 DataTreeModification dataTreeModification = mock(DataTreeModification.class);
921 doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
922 doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
923 doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
930 public void testWriteCompletionForLocalShard() {
931 completeOperationLocal(transactionProxy -> {
932 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
934 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
936 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
938 }, createDataTree());
942 public void testWriteThrottlingWhenShardFound() {
943 throttleOperation(transactionProxy -> {
944 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
946 expectIncompleteBatchedModifications();
948 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
950 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
955 public void testWriteThrottlingWhenShardNotFound() {
956 // Confirm that there is no throttling when the Shard is not found
957 completeOperation(transactionProxy -> {
958 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
960 expectBatchedModifications(2);
962 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
964 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
971 public void testWriteCompletion() {
972 completeOperation(transactionProxy -> {
973 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
975 expectBatchedModifications(2);
977 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
979 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
984 public void testMergeThrottlingWhenShardFound() {
985 throttleOperation(transactionProxy -> {
986 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
988 expectIncompleteBatchedModifications();
990 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
992 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
997 public void testMergeThrottlingWhenShardNotFound() {
998 completeOperation(transactionProxy -> {
999 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1001 expectBatchedModifications(2);
1003 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1005 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1010 public void testMergeCompletion() {
1011 completeOperation(transactionProxy -> {
1012 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1014 expectBatchedModifications(2);
1016 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1018 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1024 public void testMergeCompletionForLocalShard() {
1025 completeOperationLocal(transactionProxy -> {
1026 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1028 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1030 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1032 }, createDataTree());
1037 public void testDeleteThrottlingWhenShardFound() {
1039 throttleOperation(transactionProxy -> {
1040 expectIncompleteBatchedModifications();
1042 transactionProxy.delete(TestModel.TEST_PATH);
1044 transactionProxy.delete(TestModel.TEST_PATH);
1050 public void testDeleteThrottlingWhenShardNotFound() {
1052 completeOperation(transactionProxy -> {
1053 expectBatchedModifications(2);
1055 transactionProxy.delete(TestModel.TEST_PATH);
1057 transactionProxy.delete(TestModel.TEST_PATH);
1062 public void testDeleteCompletionForLocalShard() {
1063 completeOperationLocal(transactionProxy -> {
1065 transactionProxy.delete(TestModel.TEST_PATH);
1067 transactionProxy.delete(TestModel.TEST_PATH);
1068 }, createDataTree());
1073 public void testDeleteCompletion() {
1074 completeOperation(transactionProxy -> {
1075 expectBatchedModifications(2);
1077 transactionProxy.delete(TestModel.TEST_PATH);
1079 transactionProxy.delete(TestModel.TEST_PATH);
1085 public void testReadThrottlingWhenShardFound() {
1087 throttleOperation(transactionProxy -> {
1088 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1089 any(ActorSelection.class), eqReadData());
1091 transactionProxy.read(TestModel.TEST_PATH);
1093 transactionProxy.read(TestModel.TEST_PATH);
1098 public void testReadThrottlingWhenShardNotFound() {
1100 completeOperation(transactionProxy -> {
1101 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1102 any(ActorSelection.class), eqReadData());
1104 transactionProxy.read(TestModel.TEST_PATH);
1106 transactionProxy.read(TestModel.TEST_PATH);
1112 public void testReadCompletion() {
1113 completeOperation(transactionProxy -> {
1114 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1116 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1117 any(ActorSelection.class), eqReadData(), any(Timeout.class));
1119 transactionProxy.read(TestModel.TEST_PATH);
1121 transactionProxy.read(TestModel.TEST_PATH);
1127 public void testReadCompletionForLocalShard() {
1128 final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1129 completeOperationLocal(transactionProxy -> {
1130 transactionProxy.read(TestModel.TEST_PATH);
1132 transactionProxy.read(TestModel.TEST_PATH);
1133 }, createDataTree(nodeToRead));
1138 public void testReadCompletionForLocalShardWhenExceptionOccurs() {
1139 completeOperationLocal(transactionProxy -> {
1140 transactionProxy.read(TestModel.TEST_PATH);
1142 transactionProxy.read(TestModel.TEST_PATH);
1143 }, createDataTree());
1148 public void testExistsThrottlingWhenShardFound() {
1150 throttleOperation(transactionProxy -> {
1151 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1152 any(ActorSelection.class), eqDataExists());
1154 transactionProxy.exists(TestModel.TEST_PATH);
1156 transactionProxy.exists(TestModel.TEST_PATH);
1161 public void testExistsThrottlingWhenShardNotFound() {
1163 completeOperation(transactionProxy -> {
1164 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1165 any(ActorSelection.class), eqDataExists());
1167 transactionProxy.exists(TestModel.TEST_PATH);
1169 transactionProxy.exists(TestModel.TEST_PATH);
1175 public void testExistsCompletion() {
1176 completeOperation(transactionProxy -> {
1177 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1178 any(ActorSelection.class), eqDataExists(), any(Timeout.class));
1180 transactionProxy.exists(TestModel.TEST_PATH);
1182 transactionProxy.exists(TestModel.TEST_PATH);
1188 public void testExistsCompletionForLocalShard() {
1189 final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1190 completeOperationLocal(transactionProxy -> {
1191 transactionProxy.exists(TestModel.TEST_PATH);
1193 transactionProxy.exists(TestModel.TEST_PATH);
1194 }, createDataTree(nodeToRead));
1199 public void testExistsCompletionForLocalShardWhenExceptionOccurs() {
1200 completeOperationLocal(transactionProxy -> {
1201 transactionProxy.exists(TestModel.TEST_PATH);
1203 transactionProxy.exists(TestModel.TEST_PATH);
1204 }, createDataTree());
1209 public void testReadyThrottling() {
1211 throttleOperation(transactionProxy -> {
1212 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1214 expectBatchedModifications(1);
1216 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1218 transactionProxy.ready();
1223 public void testReadyThrottlingWithTwoTransactionContexts() {
1224 throttleOperation(transactionProxy -> {
1225 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1226 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1228 expectBatchedModifications(2);
1230 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1232 // Trying to write to Cars will cause another transaction context to get created
1233 transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1235 // Now ready should block for both transaction contexts
1236 transactionProxy.ready();
1237 }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
1238 .getOperationTimeoutInMillis()) * 2);
1241 private void testModificationOperationBatching(TransactionType type) throws Exception {
1242 int shardBatchedModificationCount = 3;
1243 dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1245 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1247 expectBatchedModifications(actorRef, shardBatchedModificationCount);
1249 YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1250 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1252 YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1253 NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1255 YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1256 NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1258 YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1259 NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1261 YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1262 NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1264 YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1265 NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1267 YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1268 YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1270 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1272 transactionProxy.write(writePath1, writeNode1);
1273 transactionProxy.write(writePath2, writeNode2);
1274 transactionProxy.delete(deletePath1);
1275 transactionProxy.merge(mergePath1, mergeNode1);
1276 transactionProxy.merge(mergePath2, mergeNode2);
1277 transactionProxy.write(writePath3, writeNode3);
1278 transactionProxy.merge(mergePath3, mergeNode3);
1279 transactionProxy.delete(deletePath2);
1281 // This sends the last batch.
1282 transactionProxy.ready();
1284 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1285 assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1287 verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1288 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1290 verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1291 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1293 verifyBatchedModifications(batchedModifications.get(2), true, true,
1294 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1296 assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1300 public void testReadWriteModificationOperationBatching() throws Exception {
1301 testModificationOperationBatching(READ_WRITE);
1305 public void testWriteOnlyModificationOperationBatching() throws Exception {
1306 testModificationOperationBatching(WRITE_ONLY);
1310 public void testOptimizedWriteOnlyModificationOperationBatching() throws Exception {
1311 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1312 testModificationOperationBatching(WRITE_ONLY);
1316 public void testModificationOperationBatchingWithInterleavedReads() throws Exception {
1318 int shardBatchedModificationCount = 10;
1319 dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1321 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1323 expectBatchedModifications(actorRef, shardBatchedModificationCount);
1325 final YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1326 final NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1328 YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1329 NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1331 final YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1332 final NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1334 YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1335 NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1337 final YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1339 doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1340 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1342 doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1343 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1345 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1346 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1348 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1350 transactionProxy.write(writePath1, writeNode1);
1351 transactionProxy.write(writePath2, writeNode2);
1353 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).get(5, TimeUnit.SECONDS);
1355 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1356 assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1358 transactionProxy.merge(mergePath1, mergeNode1);
1359 transactionProxy.merge(mergePath2, mergeNode2);
1361 readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1363 transactionProxy.delete(deletePath);
1365 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1366 assertEquals("Exists response", true, exists);
1368 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1369 assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1371 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1372 assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1374 verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1375 new WriteModification(writePath2, writeNode2));
1377 verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1378 new MergeModification(mergePath2, mergeNode2));
1380 verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1382 InOrder inOrder = Mockito.inOrder(mockActorContext);
1383 inOrder.verify(mockActorContext).executeOperationAsync(
1384 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1386 inOrder.verify(mockActorContext).executeOperationAsync(
1387 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1389 inOrder.verify(mockActorContext).executeOperationAsync(
1390 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1392 inOrder.verify(mockActorContext).executeOperationAsync(
1393 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1395 inOrder.verify(mockActorContext).executeOperationAsync(
1396 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1398 inOrder.verify(mockActorContext).executeOperationAsync(
1399 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1403 public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException,
1404 java.util.concurrent.TimeoutException {
1405 SchemaContext schemaContext = SchemaContextHelper.full();
1406 Configuration configuration = mock(Configuration.class);
1407 doReturn(configuration).when(mockActorContext).getConfiguration();
1408 doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1409 doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1411 NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1412 NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1414 setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1415 setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1417 doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
1419 doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1421 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1423 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1424 YangInstanceIdentifier.EMPTY).get(5, TimeUnit.SECONDS);
1426 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1428 NormalizedNode<?, ?> normalizedNode = readOptional.get();
1430 assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1432 @SuppressWarnings("unchecked")
1433 Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1435 for (NormalizedNode<?,?> node : collection) {
1436 assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1439 assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1440 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1442 assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1444 assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1445 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1447 assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1451 private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
1452 ActorSystem actorSystem = getSystem();
1453 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1455 doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
1456 .actorSelection(shardActorRef.path().toString());
1458 doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).when(mockActorContext)
1459 .findPrimaryShardAsync(eq(shardName));
1461 ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1463 doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext)
1464 .actorSelection(txActorRef.path().toString());
1466 doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION)))
1467 .when(mockActorContext).executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1468 eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1470 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1471 eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.EMPTY), any(Timeout.class));