2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
26 import akka.actor.ActorRef;
27 import akka.actor.ActorSelection;
28 import akka.actor.ActorSystem;
29 import akka.actor.Props;
30 import akka.dispatch.Futures;
31 import akka.util.Timeout;
32 import com.google.common.base.Throwables;
33 import com.google.common.collect.ImmutableSortedSet;
34 import com.google.common.collect.Sets;
35 import com.google.common.util.concurrent.FluentFuture;
36 import com.google.common.util.concurrent.FutureCallback;
37 import com.google.common.util.concurrent.MoreExecutors;
38 import com.google.common.util.concurrent.Uninterruptibles;
39 import java.util.Collection;
40 import java.util.List;
41 import java.util.Optional;
42 import java.util.SortedSet;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.ExecutionException;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicReference;
47 import org.junit.Assert;
48 import org.junit.Test;
49 import org.mockito.ArgumentCaptor;
50 import org.mockito.InOrder;
51 import org.mockito.Mockito;
52 import org.opendaylight.controller.cluster.access.concepts.MemberName;
53 import org.opendaylight.controller.cluster.datastore.config.Configuration;
54 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
55 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
56 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
57 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
59 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
63 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
64 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
65 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
66 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
67 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
68 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
69 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
70 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
71 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
72 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
73 import org.opendaylight.mdsal.common.api.ReadFailedException;
74 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
75 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
76 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
77 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
78 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
79 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
80 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
81 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
82 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
83 import scala.concurrent.Promise;
85 @SuppressWarnings({"resource", "checkstyle:IllegalThrows", "checkstyle:AvoidHidingCauseException"})
86 public class TransactionProxyTest extends AbstractTransactionProxyTest {
88 @SuppressWarnings("serial")
89 static class TestException extends RuntimeException {
93 FluentFuture<?> invoke(TransactionProxy proxy);
97 public void testRead() throws Exception {
98 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
100 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
102 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
103 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
105 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
106 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
108 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
110 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
112 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
113 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
115 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
117 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
119 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
122 @Test(expected = ReadFailedException.class)
123 public void testReadWithInvalidReplyMessageType() throws Throwable {
124 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
126 doReturn(Futures.successful(new Object())).when(mockActorContext)
127 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
129 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
132 transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
133 } catch (ExecutionException e) {
138 @Test(expected = TestException.class)
139 public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
140 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
142 doReturn(Futures.failed(new TestException())).when(mockActorContext)
143 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
145 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
147 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
150 private void testExceptionOnInitialCreateTransaction(final Exception exToThrow, final Invoker invoker)
152 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
154 if (exToThrow instanceof PrimaryNotFoundException) {
155 doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
157 doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
158 .findPrimaryShardAsync(anyString());
161 doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
162 any(ActorSelection.class), any(), any(Timeout.class));
164 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
166 propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
169 private void testReadWithExceptionOnInitialCreateTransaction(final Exception exToThrow) throws Throwable {
170 testExceptionOnInitialCreateTransaction(exToThrow, proxy -> proxy.read(TestModel.TEST_PATH));
173 @Test(expected = PrimaryNotFoundException.class)
174 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
175 testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
178 @Test(expected = TestException.class)
179 public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
180 testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
181 new TestException()));
184 @Test(expected = TestException.class)
185 public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
186 testReadWithExceptionOnInitialCreateTransaction(new TestException());
190 public void testReadWithPriorRecordingOperationSuccessful() throws Exception {
191 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
193 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
195 expectBatchedModifications(actorRef, 1);
197 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
198 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
200 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
202 transactionProxy.write(TestModel.TEST_PATH, expectedNode);
204 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
205 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
207 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
208 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
210 InOrder inOrder = Mockito.inOrder(mockActorContext);
211 inOrder.verify(mockActorContext).executeOperationAsync(
212 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
214 inOrder.verify(mockActorContext).executeOperationAsync(
215 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
218 @Test(expected = IllegalStateException.class)
219 public void testReadPreConditionCheck() {
220 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
221 transactionProxy.read(TestModel.TEST_PATH);
224 @Test(expected = IllegalArgumentException.class)
225 public void testInvalidCreateTransactionReply() throws Throwable {
226 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
228 doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext)
229 .actorSelection(actorRef.path().toString());
231 doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
232 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
234 doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
235 eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
238 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
240 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
244 public void testExists() throws Exception {
245 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
247 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
249 doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
250 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
252 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
254 assertEquals("Exists response", false, exists);
256 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
257 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
259 exists = transactionProxy.exists(TestModel.TEST_PATH).get();
261 assertEquals("Exists response", true, exists);
264 @Test(expected = PrimaryNotFoundException.class)
265 public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
266 testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"),
267 proxy -> proxy.exists(TestModel.TEST_PATH));
270 @Test(expected = ReadFailedException.class)
271 public void testExistsWithInvalidReplyMessageType() throws Throwable {
272 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
274 doReturn(Futures.successful(new Object())).when(mockActorContext)
275 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
277 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
280 transactionProxy.exists(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
281 } catch (ExecutionException e) {
286 @Test(expected = TestException.class)
287 public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
288 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
290 doReturn(Futures.failed(new TestException())).when(mockActorContext)
291 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
293 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
295 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
299 public void testExistsWithPriorRecordingOperationSuccessful() throws Exception {
300 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
302 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
304 expectBatchedModifications(actorRef, 1);
306 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
307 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
309 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
311 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
313 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
315 assertEquals("Exists response", true, exists);
317 InOrder inOrder = Mockito.inOrder(mockActorContext);
318 inOrder.verify(mockActorContext).executeOperationAsync(
319 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
321 inOrder.verify(mockActorContext).executeOperationAsync(
322 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
325 @Test(expected = IllegalStateException.class)
326 public void testExistsPreConditionCheck() {
327 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
328 transactionProxy.exists(TestModel.TEST_PATH);
332 public void testWrite() {
333 dataStoreContextBuilder.shardBatchedModificationCount(1);
334 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
336 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
338 expectBatchedModifications(actorRef, 1);
340 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
342 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
344 verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
348 @SuppressWarnings("checkstyle:IllegalCatch")
349 public void testWriteAfterAsyncRead() throws Exception {
350 ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(),
351 DefaultShardStrategy.DEFAULT_SHARD);
353 Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
354 doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
355 eq(getSystem().actorSelection(actorRef.path())),
356 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
358 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
359 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
361 expectBatchedModificationsReady(actorRef);
363 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
365 final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
367 final CountDownLatch readComplete = new CountDownLatch(1);
368 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
369 com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
370 new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
372 public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
374 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
375 } catch (Exception e) {
378 readComplete.countDown();
383 public void onFailure(final Throwable failure) {
384 caughtEx.set(failure);
385 readComplete.countDown();
387 }, MoreExecutors.directExecutor());
389 createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
391 Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
393 final Throwable t = caughtEx.get();
395 Throwables.propagateIfPossible(t, Exception.class);
396 throw new RuntimeException(t);
399 // This sends the batched modification.
400 transactionProxy.ready();
402 verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
405 @Test(expected = IllegalStateException.class)
406 public void testWritePreConditionCheck() {
407 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
408 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
411 @Test(expected = IllegalStateException.class)
412 public void testWriteAfterReadyPreConditionCheck() {
413 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
415 transactionProxy.ready();
417 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
421 public void testMerge() {
422 dataStoreContextBuilder.shardBatchedModificationCount(1);
423 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
425 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
427 expectBatchedModifications(actorRef, 1);
429 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
431 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
433 verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
437 public void testDelete() {
438 dataStoreContextBuilder.shardBatchedModificationCount(1);
439 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
441 expectBatchedModifications(actorRef, 1);
443 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
445 transactionProxy.delete(TestModel.TEST_PATH);
447 verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
451 public void testReadWrite() {
452 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
454 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
456 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
457 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
459 expectBatchedModifications(actorRef, 1);
461 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
463 transactionProxy.read(TestModel.TEST_PATH);
465 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
467 transactionProxy.read(TestModel.TEST_PATH);
469 transactionProxy.read(TestModel.TEST_PATH);
471 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
472 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
474 verifyBatchedModifications(batchedModifications.get(0), false,
475 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
479 public void testReadyWithReadWrite() {
480 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
482 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
484 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
485 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
487 expectBatchedModificationsReady(actorRef, true);
489 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
491 transactionProxy.read(TestModel.TEST_PATH);
493 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
495 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
497 assertTrue(ready instanceof SingleCommitCohortProxy);
499 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
501 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
502 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
504 verifyBatchedModifications(batchedModifications.get(0), true, true,
505 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
507 assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
511 public void testReadyWithNoModifications() {
512 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
514 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
515 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
517 expectBatchedModificationsReady(actorRef, true);
519 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
521 transactionProxy.read(TestModel.TEST_PATH);
523 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
525 assertTrue(ready instanceof SingleCommitCohortProxy);
527 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
529 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
530 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
532 verifyBatchedModifications(batchedModifications.get(0), true, true);
536 public void testReadyWithMultipleShardWrites() {
537 ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
539 ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
540 TestModel.JUNK_QNAME.getLocalName());
542 expectBatchedModificationsReady(actorRef1);
543 expectBatchedModificationsReady(actorRef2);
545 ActorRef actorRef3 = getSystem().actorOf(Props.create(DoNothingActor.class));
547 doReturn(getSystem().actorSelection(actorRef3.path())).when(mockActorContext)
548 .actorSelection(actorRef3.path().toString());
550 doReturn(Futures.successful(newPrimaryShardInfo(actorRef3, createDataTree()))).when(mockActorContext)
551 .findPrimaryShardAsync(eq(CarsModel.BASE_QNAME.getLocalName()));
553 expectReadyLocalTransaction(actorRef3, false);
555 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
557 transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
558 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
559 transactionProxy.write(CarsModel.BASE_PATH, ImmutableNodes.containerNode(CarsModel.BASE_QNAME));
561 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
563 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
565 verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
566 actorSelection(actorRef2), actorSelection(actorRef3));
568 SortedSet<String> expShardNames =
569 ImmutableSortedSet.of(DefaultShardStrategy.DEFAULT_SHARD,
570 TestModel.JUNK_QNAME.getLocalName(), CarsModel.BASE_QNAME.getLocalName());
572 ArgumentCaptor<BatchedModifications> batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
573 verify(mockActorContext).executeOperationAsync(
574 eq(actorSelection(actorRef1)), batchedMods.capture(), any(Timeout.class));
575 assertEquals("Participating shards present", true,
576 batchedMods.getValue().getParticipatingShardNames().isPresent());
577 assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
579 batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
580 verify(mockActorContext).executeOperationAsync(
581 eq(actorSelection(actorRef2)), batchedMods.capture(), any(Timeout.class));
582 assertEquals("Participating shards present", true,
583 batchedMods.getValue().getParticipatingShardNames().isPresent());
584 assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
586 ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
587 verify(mockActorContext).executeOperationAsync(
588 eq(actorSelection(actorRef3)), readyLocalTx.capture(), any(Timeout.class));
589 assertEquals("Participating shards present", true,
590 readyLocalTx.getValue().getParticipatingShardNames().isPresent());
591 assertEquals("Participating shards", expShardNames, readyLocalTx.getValue().getParticipatingShardNames().get());
595 public void testReadyWithWriteOnlyAndLastBatchPending() {
596 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
598 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
600 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
602 expectBatchedModificationsReady(actorRef, true);
604 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
606 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
608 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
610 assertTrue(ready instanceof SingleCommitCohortProxy);
612 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
614 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
615 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
617 verifyBatchedModifications(batchedModifications.get(0), true, true,
618 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
622 public void testReadyWithWriteOnlyAndLastBatchEmpty() {
623 dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
624 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
626 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
628 expectBatchedModificationsReady(actorRef, true);
630 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
632 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
634 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
636 assertTrue(ready instanceof SingleCommitCohortProxy);
638 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
640 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
641 assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
643 verifyBatchedModifications(batchedModifications.get(0), false,
644 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
646 verifyBatchedModifications(batchedModifications.get(1), true, true);
650 public void testReadyWithReplyFailure() {
651 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
653 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
655 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
657 expectFailedBatchedModifications(actorRef);
659 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
661 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
663 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
665 assertTrue(ready instanceof SingleCommitCohortProxy);
667 verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
671 public void testReadyWithDebugContextEnabled() {
672 dataStoreContextBuilder.transactionDebugContextEnabled(true);
674 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
676 expectBatchedModificationsReady(actorRef, true);
678 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
680 transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
682 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
684 assertTrue(ready instanceof DebugThreePhaseCommitCohort);
686 verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
690 public void testReadyWithLocalTransaction() {
691 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
693 doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
694 .actorSelection(shardActorRef.path().toString());
696 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).when(mockActorContext)
697 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
699 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
701 expectReadyLocalTransaction(shardActorRef, true);
703 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
704 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
706 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
707 assertTrue(ready instanceof SingleCommitCohortProxy);
708 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
710 ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
711 verify(mockActorContext).executeOperationAsync(
712 eq(actorSelection(shardActorRef)), readyLocalTx.capture(), any(Timeout.class));
713 assertEquals("Participating shards present", false,
714 readyLocalTx.getValue().getParticipatingShardNames().isPresent());
718 public void testReadyWithLocalTransactionWithFailure() {
719 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
721 doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
722 .actorSelection(shardActorRef.path().toString());
724 DataTree mockDataTree = createDataTree();
725 DataTreeModification mockModification = mockDataTree.takeSnapshot().newModification();
726 doThrow(new RuntimeException("mock")).when(mockModification).ready();
728 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).when(mockActorContext)
729 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
731 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
733 expectReadyLocalTransaction(shardActorRef, true);
735 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
736 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
738 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
739 assertTrue(ready instanceof SingleCommitCohortProxy);
740 verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
743 private void testWriteOnlyTxWithFindPrimaryShardFailure(final Exception toThrow) {
744 doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
746 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
748 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
750 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
752 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
754 transactionProxy.delete(TestModel.TEST_PATH);
756 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
758 assertTrue(ready instanceof SingleCommitCohortProxy);
760 verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
764 public void testWriteOnlyTxWithPrimaryNotFoundException() {
765 testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
769 public void testWriteOnlyTxWithNotInitializedException() {
770 testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
774 public void testWriteOnlyTxWithNoShardLeaderException() {
775 testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
779 public void testReadyWithInvalidReplyMessageType() {
780 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
781 ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
783 ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
784 TestModel.JUNK_QNAME.getLocalName());
786 doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
787 eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
789 expectBatchedModificationsReady(actorRef2);
791 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
793 transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
794 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
796 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
798 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
800 verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
801 IllegalArgumentException.class);
805 public void testGetIdentifier() {
806 setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
807 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
809 Object id = transactionProxy.getIdentifier();
810 assertNotNull("getIdentifier returned null", id);
811 assertTrue("Invalid identifier: " + id, id.toString().contains(memberName));
815 public void testClose() {
816 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
818 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
819 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
821 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
823 transactionProxy.read(TestModel.TEST_PATH);
825 transactionProxy.close();
827 verify(mockActorContext).sendOperationAsync(
828 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
831 private interface TransactionProxyOperation {
832 void run(TransactionProxy transactionProxy);
835 private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef) {
836 return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
839 private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef, final DataTree dataTree) {
840 return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
844 private void throttleOperation(final TransactionProxyOperation operation) {
845 throttleOperation(operation, 1, true);
848 private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
849 final boolean shardFound) {
850 throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
851 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
854 private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
855 final boolean shardFound, final long expectedCompletionTime) {
856 ActorSystem actorSystem = getSystem();
857 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
859 // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
860 // we now allow one extra permit to be allowed for ready
861 doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2)
862 .shardBatchedModificationCount(outstandingOpsLimit - 1).build()).when(mockActorContext)
863 .getDatastoreContext();
865 doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
866 .actorSelection(shardActorRef.path().toString());
869 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
870 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
871 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
872 .findPrimaryShardAsync(eq("cars"));
875 doReturn(Futures.failed(new Exception("not found")))
876 .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
879 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
880 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
883 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
885 long start = System.nanoTime();
887 operation.run(transactionProxy);
889 long end = System.nanoTime();
891 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
892 expectedCompletionTime, end - start),
893 end - start > expectedCompletionTime && end - start < expectedCompletionTime * 2);
897 private void completeOperation(final TransactionProxyOperation operation) {
898 completeOperation(operation, true);
901 private void completeOperation(final TransactionProxyOperation operation, final boolean shardFound) {
902 ActorSystem actorSystem = getSystem();
903 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
905 doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
906 .actorSelection(shardActorRef.path().toString());
909 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
910 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
912 doReturn(Futures.failed(new PrimaryNotFoundException("test"))).when(mockActorContext)
913 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
916 ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
917 String actorPath = txActorRef.path().toString();
918 CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, nextTransactionId(),
919 DataStoreVersions.CURRENT_VERSION);
921 doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
923 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).executeOperationAsync(
924 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
927 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
929 long start = System.nanoTime();
931 operation.run(transactionProxy);
933 long end = System.nanoTime();
935 long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
936 .getOperationTimeoutInMillis());
937 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
938 expected, end - start), end - start <= expected);
941 private void completeOperationLocal(final TransactionProxyOperation operation, final DataTree dataTree) {
942 ActorSystem actorSystem = getSystem();
943 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
945 doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
946 .actorSelection(shardActorRef.path().toString());
948 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTree))).when(mockActorContext)
949 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
951 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
953 long start = System.nanoTime();
955 operation.run(transactionProxy);
957 long end = System.nanoTime();
959 long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
960 .getOperationTimeoutInMillis());
961 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", expected, end - start),
962 end - start <= expected);
965 private static DataTree createDataTree() {
966 DataTree dataTree = mock(DataTree.class);
967 DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
968 DataTreeModification dataTreeModification = mock(DataTreeModification.class);
970 doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
971 doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
976 private static DataTree createDataTree(final NormalizedNode<?, ?> readResponse) {
977 DataTree dataTree = mock(DataTree.class);
978 DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
979 DataTreeModification dataTreeModification = mock(DataTreeModification.class);
981 doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
982 doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
983 doReturn(java.util.Optional.of(readResponse)).when(dataTreeModification).readNode(
984 any(YangInstanceIdentifier.class));
991 public void testWriteCompletionForLocalShard() {
992 completeOperationLocal(transactionProxy -> {
993 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
995 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
997 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
999 }, createDataTree());
1003 public void testWriteThrottlingWhenShardFound() {
1004 throttleOperation(transactionProxy -> {
1005 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1007 expectIncompleteBatchedModifications();
1009 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1011 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1016 public void testWriteThrottlingWhenShardNotFound() {
1017 // Confirm that there is no throttling when the Shard is not found
1018 completeOperation(transactionProxy -> {
1019 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1021 expectBatchedModifications(2);
1023 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1025 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1032 public void testWriteCompletion() {
1033 completeOperation(transactionProxy -> {
1034 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1036 expectBatchedModifications(2);
1038 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1040 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1045 public void testMergeThrottlingWhenShardFound() {
1046 throttleOperation(transactionProxy -> {
1047 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1049 expectIncompleteBatchedModifications();
1051 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1053 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1058 public void testMergeThrottlingWhenShardNotFound() {
1059 completeOperation(transactionProxy -> {
1060 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1062 expectBatchedModifications(2);
1064 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1066 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1071 public void testMergeCompletion() {
1072 completeOperation(transactionProxy -> {
1073 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1075 expectBatchedModifications(2);
1077 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1079 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1085 public void testMergeCompletionForLocalShard() {
1086 completeOperationLocal(transactionProxy -> {
1087 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1089 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1091 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1093 }, createDataTree());
1098 public void testDeleteThrottlingWhenShardFound() {
1100 throttleOperation(transactionProxy -> {
1101 expectIncompleteBatchedModifications();
1103 transactionProxy.delete(TestModel.TEST_PATH);
1105 transactionProxy.delete(TestModel.TEST_PATH);
1111 public void testDeleteThrottlingWhenShardNotFound() {
1113 completeOperation(transactionProxy -> {
1114 expectBatchedModifications(2);
1116 transactionProxy.delete(TestModel.TEST_PATH);
1118 transactionProxy.delete(TestModel.TEST_PATH);
1123 public void testDeleteCompletionForLocalShard() {
1124 completeOperationLocal(transactionProxy -> {
1126 transactionProxy.delete(TestModel.TEST_PATH);
1128 transactionProxy.delete(TestModel.TEST_PATH);
1129 }, createDataTree());
1134 public void testDeleteCompletion() {
1135 completeOperation(transactionProxy -> {
1136 expectBatchedModifications(2);
1138 transactionProxy.delete(TestModel.TEST_PATH);
1140 transactionProxy.delete(TestModel.TEST_PATH);
1146 public void testReadThrottlingWhenShardFound() {
1148 throttleOperation(transactionProxy -> {
1149 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1150 any(ActorSelection.class), eqReadData());
1152 transactionProxy.read(TestModel.TEST_PATH);
1154 transactionProxy.read(TestModel.TEST_PATH);
1159 public void testReadThrottlingWhenShardNotFound() {
1161 completeOperation(transactionProxy -> {
1162 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1163 any(ActorSelection.class), eqReadData());
1165 transactionProxy.read(TestModel.TEST_PATH);
1167 transactionProxy.read(TestModel.TEST_PATH);
1173 public void testReadCompletion() {
1174 completeOperation(transactionProxy -> {
1175 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1177 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1178 any(ActorSelection.class), eqReadData(), any(Timeout.class));
1180 transactionProxy.read(TestModel.TEST_PATH);
1182 transactionProxy.read(TestModel.TEST_PATH);
1188 public void testReadCompletionForLocalShard() {
1189 final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1190 completeOperationLocal(transactionProxy -> {
1191 transactionProxy.read(TestModel.TEST_PATH);
1193 transactionProxy.read(TestModel.TEST_PATH);
1194 }, createDataTree(nodeToRead));
1199 public void testReadCompletionForLocalShardWhenExceptionOccurs() {
1200 completeOperationLocal(transactionProxy -> {
1201 transactionProxy.read(TestModel.TEST_PATH);
1203 transactionProxy.read(TestModel.TEST_PATH);
1204 }, createDataTree());
1209 public void testExistsThrottlingWhenShardFound() {
1211 throttleOperation(transactionProxy -> {
1212 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1213 any(ActorSelection.class), eqDataExists());
1215 transactionProxy.exists(TestModel.TEST_PATH);
1217 transactionProxy.exists(TestModel.TEST_PATH);
1222 public void testExistsThrottlingWhenShardNotFound() {
1224 completeOperation(transactionProxy -> {
1225 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1226 any(ActorSelection.class), eqDataExists());
1228 transactionProxy.exists(TestModel.TEST_PATH);
1230 transactionProxy.exists(TestModel.TEST_PATH);
1236 public void testExistsCompletion() {
1237 completeOperation(transactionProxy -> {
1238 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1239 any(ActorSelection.class), eqDataExists(), any(Timeout.class));
1241 transactionProxy.exists(TestModel.TEST_PATH);
1243 transactionProxy.exists(TestModel.TEST_PATH);
1249 public void testExistsCompletionForLocalShard() {
1250 final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1251 completeOperationLocal(transactionProxy -> {
1252 transactionProxy.exists(TestModel.TEST_PATH);
1254 transactionProxy.exists(TestModel.TEST_PATH);
1255 }, createDataTree(nodeToRead));
1260 public void testExistsCompletionForLocalShardWhenExceptionOccurs() {
1261 completeOperationLocal(transactionProxy -> {
1262 transactionProxy.exists(TestModel.TEST_PATH);
1264 transactionProxy.exists(TestModel.TEST_PATH);
1265 }, createDataTree());
1270 public void testReadyThrottling() {
1272 throttleOperation(transactionProxy -> {
1273 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1275 expectBatchedModifications(1);
1277 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1279 transactionProxy.ready();
1284 public void testReadyThrottlingWithTwoTransactionContexts() {
1285 throttleOperation(transactionProxy -> {
1286 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1287 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1289 expectBatchedModifications(2);
1291 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1293 // Trying to write to Cars will cause another transaction context to get created
1294 transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1296 // Now ready should block for both transaction contexts
1297 transactionProxy.ready();
1298 }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
1299 .getOperationTimeoutInMillis()) * 2);
1302 private void testModificationOperationBatching(final TransactionType type) {
1303 int shardBatchedModificationCount = 3;
1304 dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1306 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1308 expectBatchedModifications(actorRef, shardBatchedModificationCount);
1310 YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1311 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1313 YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1314 NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1316 YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1317 NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1319 YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1320 NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1322 YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1323 NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1325 YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1326 NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1328 YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1329 YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1331 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1333 transactionProxy.write(writePath1, writeNode1);
1334 transactionProxy.write(writePath2, writeNode2);
1335 transactionProxy.delete(deletePath1);
1336 transactionProxy.merge(mergePath1, mergeNode1);
1337 transactionProxy.merge(mergePath2, mergeNode2);
1338 transactionProxy.write(writePath3, writeNode3);
1339 transactionProxy.merge(mergePath3, mergeNode3);
1340 transactionProxy.delete(deletePath2);
1342 // This sends the last batch.
1343 transactionProxy.ready();
1345 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1346 assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1348 verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1349 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1351 verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1352 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1354 verifyBatchedModifications(batchedModifications.get(2), true, true,
1355 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1357 assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1361 public void testReadWriteModificationOperationBatching() {
1362 testModificationOperationBatching(READ_WRITE);
1366 public void testWriteOnlyModificationOperationBatching() {
1367 testModificationOperationBatching(WRITE_ONLY);
1371 public void testOptimizedWriteOnlyModificationOperationBatching() {
1372 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1373 testModificationOperationBatching(WRITE_ONLY);
1377 public void testModificationOperationBatchingWithInterleavedReads() throws Exception {
1379 int shardBatchedModificationCount = 10;
1380 dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1382 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1384 expectBatchedModifications(actorRef, shardBatchedModificationCount);
1386 final YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1387 final NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1389 YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1390 NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1392 final YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1393 final NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1395 YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1396 NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1398 final YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1400 doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1401 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1403 doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1404 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1406 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1407 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1409 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1411 transactionProxy.write(writePath1, writeNode1);
1412 transactionProxy.write(writePath2, writeNode2);
1414 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).get(5, TimeUnit.SECONDS);
1416 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1417 assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1419 transactionProxy.merge(mergePath1, mergeNode1);
1420 transactionProxy.merge(mergePath2, mergeNode2);
1422 readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1424 transactionProxy.delete(deletePath);
1426 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
1427 assertEquals("Exists response", true, exists);
1429 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1430 assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1432 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1433 assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1435 verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1436 new WriteModification(writePath2, writeNode2));
1438 verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1439 new MergeModification(mergePath2, mergeNode2));
1441 verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1443 InOrder inOrder = Mockito.inOrder(mockActorContext);
1444 inOrder.verify(mockActorContext).executeOperationAsync(
1445 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1447 inOrder.verify(mockActorContext).executeOperationAsync(
1448 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1450 inOrder.verify(mockActorContext).executeOperationAsync(
1451 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1453 inOrder.verify(mockActorContext).executeOperationAsync(
1454 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1456 inOrder.verify(mockActorContext).executeOperationAsync(
1457 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1459 inOrder.verify(mockActorContext).executeOperationAsync(
1460 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1464 public void testReadRoot() throws InterruptedException, ExecutionException,
1465 java.util.concurrent.TimeoutException {
1466 SchemaContext schemaContext = SchemaContextHelper.full();
1467 Configuration configuration = mock(Configuration.class);
1468 doReturn(configuration).when(mockActorContext).getConfiguration();
1469 doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1470 doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1472 NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1473 NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1475 setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1476 setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1478 doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
1480 doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1482 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1484 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1485 YangInstanceIdentifier.EMPTY).get(5, TimeUnit.SECONDS);
1487 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1489 NormalizedNode<?, ?> normalizedNode = readOptional.get();
1491 assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1493 @SuppressWarnings("unchecked")
1494 Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1496 for (NormalizedNode<?,?> node : collection) {
1497 assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1500 assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1501 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1503 assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1505 assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1506 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1508 assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1512 private void setUpReadData(final String shardName, final NormalizedNode<?, ?> expectedNode) {
1513 ActorSystem actorSystem = getSystem();
1514 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1516 doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
1517 .actorSelection(shardActorRef.path().toString());
1519 doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).when(mockActorContext)
1520 .findPrimaryShardAsync(eq(shardName));
1522 ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1524 doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext)
1525 .actorSelection(txActorRef.path().toString());
1527 doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION)))
1528 .when(mockActorContext).executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1529 eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1531 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1532 eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.EMPTY), any(Timeout.class));