2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
26 import akka.actor.ActorRef;
27 import akka.actor.ActorSelection;
28 import akka.actor.ActorSystem;
29 import akka.actor.Props;
30 import akka.dispatch.Futures;
31 import akka.util.Timeout;
32 import com.google.common.base.Optional;
33 import com.google.common.base.Throwables;
34 import com.google.common.collect.Sets;
35 import com.google.common.util.concurrent.CheckedFuture;
36 import com.google.common.util.concurrent.FutureCallback;
37 import com.google.common.util.concurrent.MoreExecutors;
38 import com.google.common.util.concurrent.Uninterruptibles;
39 import java.util.Collection;
40 import java.util.List;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.ExecutionException;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicReference;
45 import org.junit.Assert;
46 import org.junit.Test;
47 import org.mockito.InOrder;
48 import org.mockito.Mockito;
49 import org.opendaylight.controller.cluster.access.concepts.MemberName;
50 import org.opendaylight.controller.cluster.datastore.config.Configuration;
51 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
52 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
53 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
54 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
55 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
56 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
59 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
60 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
61 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
62 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
63 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
64 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
65 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
66 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
67 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
68 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
69 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
70 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
71 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
72 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
73 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
74 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
75 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
76 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
77 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
78 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
79 import scala.concurrent.Promise;
81 @SuppressWarnings("resource")
82 public class TransactionProxyTest extends AbstractTransactionProxyTest {
84 @SuppressWarnings("serial")
85 static class TestException extends RuntimeException {
89 CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
93 public void testRead() throws Exception {
94 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
96 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
98 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
99 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
101 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
102 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
104 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
106 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
108 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
109 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
111 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
113 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
115 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
118 @Test(expected = ReadFailedException.class)
119 public void testReadWithInvalidReplyMessageType() throws Exception {
120 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
122 doReturn(Futures.successful(new Object())).when(mockActorContext)
123 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
125 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
127 transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
130 @Test(expected = TestException.class)
131 public void testReadWithAsyncRemoteOperatonFailure() throws Exception {
132 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
134 doReturn(Futures.failed(new TestException())).when(mockActorContext)
135 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
137 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
139 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
142 private void testExceptionOnInitialCreateTransaction(final Exception exToThrow, final Invoker invoker)
144 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
146 if (exToThrow instanceof PrimaryNotFoundException) {
147 doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
149 doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
150 .findPrimaryShardAsync(anyString());
153 doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
154 any(ActorSelection.class), any(), any(Timeout.class));
156 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
158 propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
161 private void testReadWithExceptionOnInitialCreateTransaction(final Exception exToThrow) throws Exception {
162 testExceptionOnInitialCreateTransaction(exToThrow, proxy -> proxy.read(TestModel.TEST_PATH));
165 @Test(expected = PrimaryNotFoundException.class)
166 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
167 testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
170 @Test(expected = TimeoutException.class)
171 public void testReadWhenATimeoutExceptionIsThrown() throws Exception {
172 testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
173 new Exception("reason")));
176 @Test(expected = TestException.class)
177 public void testReadWhenAnyOtherExceptionIsThrown() throws Exception {
178 testReadWithExceptionOnInitialCreateTransaction(new TestException());
182 public void testReadWithPriorRecordingOperationSuccessful() throws Exception {
183 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
185 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
187 expectBatchedModifications(actorRef, 1);
189 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
190 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
192 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
194 transactionProxy.write(TestModel.TEST_PATH, expectedNode);
196 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
197 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
199 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
200 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
202 InOrder inOrder = Mockito.inOrder(mockActorContext);
203 inOrder.verify(mockActorContext).executeOperationAsync(
204 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
206 inOrder.verify(mockActorContext).executeOperationAsync(
207 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
210 @Test(expected = IllegalStateException.class)
211 public void testReadPreConditionCheck() {
212 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
213 transactionProxy.read(TestModel.TEST_PATH);
216 @Test(expected = IllegalArgumentException.class)
217 public void testInvalidCreateTransactionReply() throws Exception {
218 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
220 doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext)
221 .actorSelection(actorRef.path().toString());
223 doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
224 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
226 doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
227 eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
230 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
232 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
236 public void testExists() throws Exception {
237 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
239 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
241 doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
242 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
244 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
246 assertEquals("Exists response", false, exists);
248 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
249 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
251 exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
253 assertEquals("Exists response", true, exists);
256 @Test(expected = PrimaryNotFoundException.class)
257 public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
258 testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"),
259 proxy -> proxy.exists(TestModel.TEST_PATH));
262 @Test(expected = ReadFailedException.class)
263 public void testExistsWithInvalidReplyMessageType() throws Exception {
264 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
266 doReturn(Futures.successful(new Object())).when(mockActorContext)
267 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
269 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
271 transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
274 @Test(expected = TestException.class)
275 public void testExistsWithAsyncRemoteOperatonFailure() throws Exception {
276 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
278 doReturn(Futures.failed(new TestException())).when(mockActorContext)
279 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
281 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
283 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
287 public void testExistsWithPriorRecordingOperationSuccessful() throws Exception {
288 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
290 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
292 expectBatchedModifications(actorRef, 1);
294 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
295 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
297 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
299 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
301 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
303 assertEquals("Exists response", true, exists);
305 InOrder inOrder = Mockito.inOrder(mockActorContext);
306 inOrder.verify(mockActorContext).executeOperationAsync(
307 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
309 inOrder.verify(mockActorContext).executeOperationAsync(
310 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
313 @Test(expected = IllegalStateException.class)
314 public void testExistsPreConditionCheck() {
315 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
316 transactionProxy.exists(TestModel.TEST_PATH);
320 public void testWrite() throws Exception {
321 dataStoreContextBuilder.shardBatchedModificationCount(1);
322 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
324 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
326 expectBatchedModifications(actorRef, 1);
328 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
330 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
332 verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
336 @SuppressWarnings("checkstyle:IllegalCatch")
337 public void testWriteAfterAsyncRead() throws Exception {
338 ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(),
339 DefaultShardStrategy.DEFAULT_SHARD);
341 Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
342 doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
343 eq(getSystem().actorSelection(actorRef.path())),
344 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
346 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
347 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
349 expectBatchedModificationsReady(actorRef);
351 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
353 final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
355 final CountDownLatch readComplete = new CountDownLatch(1);
356 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
357 com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
358 new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
360 public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
362 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
363 } catch (Exception e) {
366 readComplete.countDown();
371 public void onFailure(final Throwable failure) {
372 caughtEx.set(failure);
373 readComplete.countDown();
375 }, MoreExecutors.directExecutor());
377 createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
379 Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
381 final Throwable t = caughtEx.get();
383 Throwables.propagateIfPossible(t, Exception.class);
384 throw new RuntimeException(t);
387 // This sends the batched modification.
388 transactionProxy.ready();
390 verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
393 @Test(expected = IllegalStateException.class)
394 public void testWritePreConditionCheck() {
395 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
396 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
399 @Test(expected = IllegalStateException.class)
400 public void testWriteAfterReadyPreConditionCheck() {
401 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
403 transactionProxy.ready();
405 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
409 public void testMerge() throws Exception {
410 dataStoreContextBuilder.shardBatchedModificationCount(1);
411 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
413 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
415 expectBatchedModifications(actorRef, 1);
417 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
419 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
421 verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
425 public void testDelete() throws Exception {
426 dataStoreContextBuilder.shardBatchedModificationCount(1);
427 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
429 expectBatchedModifications(actorRef, 1);
431 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
433 transactionProxy.delete(TestModel.TEST_PATH);
435 verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
439 public void testReadWrite() throws Exception {
440 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
442 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
444 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
445 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
447 expectBatchedModifications(actorRef, 1);
449 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
451 transactionProxy.read(TestModel.TEST_PATH);
453 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
455 transactionProxy.read(TestModel.TEST_PATH);
457 transactionProxy.read(TestModel.TEST_PATH);
459 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
460 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
462 verifyBatchedModifications(batchedModifications.get(0), false,
463 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
467 public void testReadyWithReadWrite() throws Exception {
468 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
470 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
472 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
473 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
475 expectBatchedModificationsReady(actorRef, true);
477 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
479 transactionProxy.read(TestModel.TEST_PATH);
481 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
483 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
485 assertTrue(ready instanceof SingleCommitCohortProxy);
487 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
489 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
490 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
492 verifyBatchedModifications(batchedModifications.get(0), true, true,
493 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
495 assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
499 public void testReadyWithNoModifications() throws Exception {
500 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
502 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
503 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
505 expectBatchedModificationsReady(actorRef, true);
507 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
509 transactionProxy.read(TestModel.TEST_PATH);
511 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
513 assertTrue(ready instanceof SingleCommitCohortProxy);
515 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
517 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
518 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
520 verifyBatchedModifications(batchedModifications.get(0), true, true);
524 public void testReadyWithMultipleShardWrites() throws Exception {
525 ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
527 ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
529 expectBatchedModificationsReady(actorRef1);
530 expectBatchedModificationsReady(actorRef2);
532 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
534 transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
535 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
537 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
539 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
541 verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
542 actorSelection(actorRef2));
546 public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
547 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
549 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
551 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
553 expectBatchedModificationsReady(actorRef, true);
555 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
557 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
559 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
561 assertTrue(ready instanceof SingleCommitCohortProxy);
563 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
565 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
566 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
568 verifyBatchedModifications(batchedModifications.get(0), true, true,
569 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
573 public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
574 dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
575 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
577 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
579 expectBatchedModificationsReady(actorRef, true);
581 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
583 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
585 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
587 assertTrue(ready instanceof SingleCommitCohortProxy);
589 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
591 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
592 assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
594 verifyBatchedModifications(batchedModifications.get(0), false,
595 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
597 verifyBatchedModifications(batchedModifications.get(1), true, true);
601 public void testReadyWithReplyFailure() throws Exception {
602 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
604 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
606 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
608 expectFailedBatchedModifications(actorRef);
610 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
612 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
614 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
616 assertTrue(ready instanceof SingleCommitCohortProxy);
618 verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
622 public void testReadyWithDebugContextEnabled() throws Exception {
623 dataStoreContextBuilder.transactionDebugContextEnabled(true);
625 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
627 expectBatchedModificationsReady(actorRef, true);
629 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
631 transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
633 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
635 assertTrue(ready instanceof DebugThreePhaseCommitCohort);
637 verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
641 public void testReadyWithLocalTransaction() throws Exception {
642 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
644 doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
645 .actorSelection(shardActorRef.path().toString());
647 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).when(mockActorContext)
648 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
650 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
652 expectReadyLocalTransaction(shardActorRef, true);
654 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
655 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
657 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
658 assertTrue(ready instanceof SingleCommitCohortProxy);
659 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
663 public void testReadyWithLocalTransactionWithFailure() throws Exception {
664 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
666 doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
667 .actorSelection(shardActorRef.path().toString());
669 DataTree mockDataTree = createDataTree();
670 DataTreeModification mockModification = mockDataTree.takeSnapshot().newModification();
671 doThrow(new RuntimeException("mock")).when(mockModification).ready();
673 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).when(mockActorContext)
674 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
676 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
678 expectReadyLocalTransaction(shardActorRef, true);
680 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
681 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
683 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
684 assertTrue(ready instanceof SingleCommitCohortProxy);
685 verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
688 private void testWriteOnlyTxWithFindPrimaryShardFailure(final Exception toThrow) throws Exception {
689 doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
691 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
693 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
695 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
697 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
699 transactionProxy.delete(TestModel.TEST_PATH);
701 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
703 assertTrue(ready instanceof SingleCommitCohortProxy);
705 verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
709 public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
710 testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
714 public void testWriteOnlyTxWithNotInitializedException() throws Exception {
715 testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
719 public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
720 testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
724 public void testReadyWithInvalidReplyMessageType() throws Exception {
725 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
726 ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
728 ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk");
730 doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
731 eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
733 expectBatchedModificationsReady(actorRef2);
735 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
737 transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
738 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
740 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
742 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
744 verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
745 IllegalArgumentException.class);
749 public void testGetIdentifier() {
750 setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
751 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
753 Object id = transactionProxy.getIdentifier();
754 assertNotNull("getIdentifier returned null", id);
755 assertTrue("Invalid identifier: " + id, id.toString().contains(memberName));
759 public void testClose() throws Exception {
760 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
762 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
763 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
765 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
767 transactionProxy.read(TestModel.TEST_PATH);
769 transactionProxy.close();
771 verify(mockActorContext).sendOperationAsync(
772 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
775 private interface TransactionProxyOperation {
776 void run(TransactionProxy transactionProxy);
779 private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef) {
780 return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
783 private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef, final DataTree dataTree) {
784 return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
788 private void throttleOperation(final TransactionProxyOperation operation) {
789 throttleOperation(operation, 1, true);
792 private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
793 final boolean shardFound) {
794 throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
795 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
798 private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
799 final boolean shardFound, final long expectedCompletionTime) {
800 ActorSystem actorSystem = getSystem();
801 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
803 // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
804 // we now allow one extra permit to be allowed for ready
805 doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2)
806 .shardBatchedModificationCount(outstandingOpsLimit - 1).build()).when(mockActorContext)
807 .getDatastoreContext();
809 doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
810 .actorSelection(shardActorRef.path().toString());
813 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
814 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
815 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
816 .findPrimaryShardAsync(eq("cars"));
819 doReturn(Futures.failed(new Exception("not found")))
820 .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
823 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
824 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
827 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
829 long start = System.nanoTime();
831 operation.run(transactionProxy);
833 long end = System.nanoTime();
835 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
836 expectedCompletionTime, end - start),
837 end - start > expectedCompletionTime && end - start < expectedCompletionTime * 2);
841 private void completeOperation(final TransactionProxyOperation operation) {
842 completeOperation(operation, true);
845 private void completeOperation(final TransactionProxyOperation operation, final boolean shardFound) {
846 ActorSystem actorSystem = getSystem();
847 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
849 doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
850 .actorSelection(shardActorRef.path().toString());
853 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
854 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
856 doReturn(Futures.failed(new PrimaryNotFoundException("test"))).when(mockActorContext)
857 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
860 ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
861 String actorPath = txActorRef.path().toString();
862 CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, nextTransactionId(),
863 DataStoreVersions.CURRENT_VERSION);
865 doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
867 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).executeOperationAsync(
868 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
871 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
873 long start = System.nanoTime();
875 operation.run(transactionProxy);
877 long end = System.nanoTime();
879 long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
880 .getOperationTimeoutInMillis());
881 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
882 expected, end - start), end - start <= expected);
885 private void completeOperationLocal(final TransactionProxyOperation operation, final DataTree dataTree) {
886 ActorSystem actorSystem = getSystem();
887 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
889 doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
890 .actorSelection(shardActorRef.path().toString());
892 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTree))).when(mockActorContext)
893 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
895 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
897 long start = System.nanoTime();
899 operation.run(transactionProxy);
901 long end = System.nanoTime();
903 long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
904 .getOperationTimeoutInMillis());
905 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", expected, end - start),
906 end - start <= expected);
909 private static DataTree createDataTree() {
910 DataTree dataTree = mock(DataTree.class);
911 DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
912 DataTreeModification dataTreeModification = mock(DataTreeModification.class);
914 doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
915 doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
920 private static DataTree createDataTree(final NormalizedNode<?, ?> readResponse) {
921 DataTree dataTree = mock(DataTree.class);
922 DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
923 DataTreeModification dataTreeModification = mock(DataTreeModification.class);
925 doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
926 doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
927 doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
934 public void testWriteCompletionForLocalShard() {
935 completeOperationLocal(transactionProxy -> {
936 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
938 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
940 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
942 }, createDataTree());
946 public void testWriteThrottlingWhenShardFound() {
947 throttleOperation(transactionProxy -> {
948 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
950 expectIncompleteBatchedModifications();
952 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
954 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
959 public void testWriteThrottlingWhenShardNotFound() {
960 // Confirm that there is no throttling when the Shard is not found
961 completeOperation(transactionProxy -> {
962 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
964 expectBatchedModifications(2);
966 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
968 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
975 public void testWriteCompletion() {
976 completeOperation(transactionProxy -> {
977 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
979 expectBatchedModifications(2);
981 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
983 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
988 public void testMergeThrottlingWhenShardFound() {
989 throttleOperation(transactionProxy -> {
990 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
992 expectIncompleteBatchedModifications();
994 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
996 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1001 public void testMergeThrottlingWhenShardNotFound() {
1002 completeOperation(transactionProxy -> {
1003 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1005 expectBatchedModifications(2);
1007 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1009 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1014 public void testMergeCompletion() {
1015 completeOperation(transactionProxy -> {
1016 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1018 expectBatchedModifications(2);
1020 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1022 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1028 public void testMergeCompletionForLocalShard() {
1029 completeOperationLocal(transactionProxy -> {
1030 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1032 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1034 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1036 }, createDataTree());
1041 public void testDeleteThrottlingWhenShardFound() {
1043 throttleOperation(transactionProxy -> {
1044 expectIncompleteBatchedModifications();
1046 transactionProxy.delete(TestModel.TEST_PATH);
1048 transactionProxy.delete(TestModel.TEST_PATH);
1054 public void testDeleteThrottlingWhenShardNotFound() {
1056 completeOperation(transactionProxy -> {
1057 expectBatchedModifications(2);
1059 transactionProxy.delete(TestModel.TEST_PATH);
1061 transactionProxy.delete(TestModel.TEST_PATH);
1066 public void testDeleteCompletionForLocalShard() {
1067 completeOperationLocal(transactionProxy -> {
1069 transactionProxy.delete(TestModel.TEST_PATH);
1071 transactionProxy.delete(TestModel.TEST_PATH);
1072 }, createDataTree());
1077 public void testDeleteCompletion() {
1078 completeOperation(transactionProxy -> {
1079 expectBatchedModifications(2);
1081 transactionProxy.delete(TestModel.TEST_PATH);
1083 transactionProxy.delete(TestModel.TEST_PATH);
1089 public void testReadThrottlingWhenShardFound() {
1091 throttleOperation(transactionProxy -> {
1092 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1093 any(ActorSelection.class), eqReadData());
1095 transactionProxy.read(TestModel.TEST_PATH);
1097 transactionProxy.read(TestModel.TEST_PATH);
1102 public void testReadThrottlingWhenShardNotFound() {
1104 completeOperation(transactionProxy -> {
1105 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1106 any(ActorSelection.class), eqReadData());
1108 transactionProxy.read(TestModel.TEST_PATH);
1110 transactionProxy.read(TestModel.TEST_PATH);
1116 public void testReadCompletion() {
1117 completeOperation(transactionProxy -> {
1118 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1120 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1121 any(ActorSelection.class), eqReadData(), any(Timeout.class));
1123 transactionProxy.read(TestModel.TEST_PATH);
1125 transactionProxy.read(TestModel.TEST_PATH);
1131 public void testReadCompletionForLocalShard() {
1132 final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1133 completeOperationLocal(transactionProxy -> {
1134 transactionProxy.read(TestModel.TEST_PATH);
1136 transactionProxy.read(TestModel.TEST_PATH);
1137 }, createDataTree(nodeToRead));
1142 public void testReadCompletionForLocalShardWhenExceptionOccurs() {
1143 completeOperationLocal(transactionProxy -> {
1144 transactionProxy.read(TestModel.TEST_PATH);
1146 transactionProxy.read(TestModel.TEST_PATH);
1147 }, createDataTree());
1152 public void testExistsThrottlingWhenShardFound() {
1154 throttleOperation(transactionProxy -> {
1155 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1156 any(ActorSelection.class), eqDataExists());
1158 transactionProxy.exists(TestModel.TEST_PATH);
1160 transactionProxy.exists(TestModel.TEST_PATH);
1165 public void testExistsThrottlingWhenShardNotFound() {
1167 completeOperation(transactionProxy -> {
1168 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1169 any(ActorSelection.class), eqDataExists());
1171 transactionProxy.exists(TestModel.TEST_PATH);
1173 transactionProxy.exists(TestModel.TEST_PATH);
1179 public void testExistsCompletion() {
1180 completeOperation(transactionProxy -> {
1181 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1182 any(ActorSelection.class), eqDataExists(), any(Timeout.class));
1184 transactionProxy.exists(TestModel.TEST_PATH);
1186 transactionProxy.exists(TestModel.TEST_PATH);
1192 public void testExistsCompletionForLocalShard() {
1193 final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1194 completeOperationLocal(transactionProxy -> {
1195 transactionProxy.exists(TestModel.TEST_PATH);
1197 transactionProxy.exists(TestModel.TEST_PATH);
1198 }, createDataTree(nodeToRead));
1203 public void testExistsCompletionForLocalShardWhenExceptionOccurs() {
1204 completeOperationLocal(transactionProxy -> {
1205 transactionProxy.exists(TestModel.TEST_PATH);
1207 transactionProxy.exists(TestModel.TEST_PATH);
1208 }, createDataTree());
1213 public void testReadyThrottling() {
1215 throttleOperation(transactionProxy -> {
1216 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1218 expectBatchedModifications(1);
1220 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1222 transactionProxy.ready();
1227 public void testReadyThrottlingWithTwoTransactionContexts() {
1228 throttleOperation(transactionProxy -> {
1229 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1230 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1232 expectBatchedModifications(2);
1234 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1236 // Trying to write to Cars will cause another transaction context to get created
1237 transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1239 // Now ready should block for both transaction contexts
1240 transactionProxy.ready();
1241 }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
1242 .getOperationTimeoutInMillis()) * 2);
1245 private void testModificationOperationBatching(final TransactionType type) throws Exception {
1246 int shardBatchedModificationCount = 3;
1247 dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1249 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1251 expectBatchedModifications(actorRef, shardBatchedModificationCount);
1253 YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1254 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1256 YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1257 NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1259 YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1260 NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1262 YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1263 NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1265 YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1266 NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1268 YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1269 NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1271 YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1272 YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1274 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1276 transactionProxy.write(writePath1, writeNode1);
1277 transactionProxy.write(writePath2, writeNode2);
1278 transactionProxy.delete(deletePath1);
1279 transactionProxy.merge(mergePath1, mergeNode1);
1280 transactionProxy.merge(mergePath2, mergeNode2);
1281 transactionProxy.write(writePath3, writeNode3);
1282 transactionProxy.merge(mergePath3, mergeNode3);
1283 transactionProxy.delete(deletePath2);
1285 // This sends the last batch.
1286 transactionProxy.ready();
1288 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1289 assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1291 verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1292 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1294 verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1295 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1297 verifyBatchedModifications(batchedModifications.get(2), true, true,
1298 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1300 assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1304 public void testReadWriteModificationOperationBatching() throws Exception {
1305 testModificationOperationBatching(READ_WRITE);
1309 public void testWriteOnlyModificationOperationBatching() throws Exception {
1310 testModificationOperationBatching(WRITE_ONLY);
1314 public void testOptimizedWriteOnlyModificationOperationBatching() throws Exception {
1315 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1316 testModificationOperationBatching(WRITE_ONLY);
1320 public void testModificationOperationBatchingWithInterleavedReads() throws Exception {
1322 int shardBatchedModificationCount = 10;
1323 dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1325 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1327 expectBatchedModifications(actorRef, shardBatchedModificationCount);
1329 final YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1330 final NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1332 YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1333 NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1335 final YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1336 final NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1338 YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1339 NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1341 final YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1343 doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1344 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1346 doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1347 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1349 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1350 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1352 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1354 transactionProxy.write(writePath1, writeNode1);
1355 transactionProxy.write(writePath2, writeNode2);
1357 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).get(5, TimeUnit.SECONDS);
1359 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1360 assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1362 transactionProxy.merge(mergePath1, mergeNode1);
1363 transactionProxy.merge(mergePath2, mergeNode2);
1365 readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1367 transactionProxy.delete(deletePath);
1369 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1370 assertEquals("Exists response", true, exists);
1372 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1373 assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1375 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1376 assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1378 verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1379 new WriteModification(writePath2, writeNode2));
1381 verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1382 new MergeModification(mergePath2, mergeNode2));
1384 verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1386 InOrder inOrder = Mockito.inOrder(mockActorContext);
1387 inOrder.verify(mockActorContext).executeOperationAsync(
1388 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1390 inOrder.verify(mockActorContext).executeOperationAsync(
1391 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1393 inOrder.verify(mockActorContext).executeOperationAsync(
1394 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1396 inOrder.verify(mockActorContext).executeOperationAsync(
1397 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1399 inOrder.verify(mockActorContext).executeOperationAsync(
1400 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1402 inOrder.verify(mockActorContext).executeOperationAsync(
1403 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1407 public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException,
1408 java.util.concurrent.TimeoutException {
1409 SchemaContext schemaContext = SchemaContextHelper.full();
1410 Configuration configuration = mock(Configuration.class);
1411 doReturn(configuration).when(mockActorContext).getConfiguration();
1412 doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1413 doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1415 NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1416 NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1418 setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1419 setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1421 doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
1423 doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1425 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1427 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1428 YangInstanceIdentifier.EMPTY).get(5, TimeUnit.SECONDS);
1430 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1432 NormalizedNode<?, ?> normalizedNode = readOptional.get();
1434 assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1436 @SuppressWarnings("unchecked")
1437 Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1439 for (NormalizedNode<?,?> node : collection) {
1440 assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1443 assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1444 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1446 assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1448 assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1449 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1451 assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1455 private void setUpReadData(final String shardName, final NormalizedNode<?, ?> expectedNode) {
1456 ActorSystem actorSystem = getSystem();
1457 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1459 doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
1460 .actorSelection(shardActorRef.path().toString());
1462 doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).when(mockActorContext)
1463 .findPrimaryShardAsync(eq(shardName));
1465 ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1467 doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext)
1468 .actorSelection(txActorRef.path().toString());
1470 doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION)))
1471 .when(mockActorContext).executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1472 eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1474 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1475 eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.EMPTY), any(Timeout.class));