2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.ArgumentMatchers.any;
16 import static org.mockito.ArgumentMatchers.anyString;
17 import static org.mockito.ArgumentMatchers.eq;
18 import static org.mockito.ArgumentMatchers.isA;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.doThrow;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.verify;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
25 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
27 import akka.actor.ActorRef;
28 import akka.actor.ActorSelection;
29 import akka.actor.ActorSystem;
30 import akka.actor.Props;
31 import akka.dispatch.Futures;
32 import akka.util.Timeout;
33 import com.google.common.base.Throwables;
34 import com.google.common.collect.ImmutableSortedSet;
35 import com.google.common.collect.Sets;
36 import com.google.common.util.concurrent.FluentFuture;
37 import com.google.common.util.concurrent.FutureCallback;
38 import com.google.common.util.concurrent.MoreExecutors;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import java.util.Collection;
41 import java.util.List;
42 import java.util.Optional;
43 import java.util.SortedSet;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.ExecutionException;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicReference;
48 import org.junit.Assert;
49 import org.junit.Test;
50 import org.mockito.ArgumentCaptor;
51 import org.mockito.InOrder;
52 import org.mockito.Mockito;
53 import org.opendaylight.controller.cluster.access.concepts.MemberName;
54 import org.opendaylight.controller.cluster.datastore.config.Configuration;
55 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
56 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
57 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
58 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
59 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
60 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
63 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
64 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
65 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
66 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
67 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
68 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
69 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
70 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
71 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
72 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
73 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
74 import org.opendaylight.mdsal.common.api.ReadFailedException;
75 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
76 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
77 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
78 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
79 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
80 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
81 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
82 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
83 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
84 import scala.concurrent.Promise;
86 @SuppressWarnings({"resource", "checkstyle:IllegalThrows", "checkstyle:AvoidHidingCauseException"})
87 public class TransactionProxyTest extends AbstractTransactionProxyTest {
89 @SuppressWarnings("serial")
90 static class TestException extends RuntimeException {
94 FluentFuture<?> invoke(TransactionProxy proxy);
98 public void testRead() throws Exception {
99 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
101 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
103 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
104 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
106 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
107 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
109 assertFalse("NormalizedNode isPresent", readOptional.isPresent());
111 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
113 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
114 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
116 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
118 assertTrue("NormalizedNode isPresent", readOptional.isPresent());
120 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
123 @Test(expected = ReadFailedException.class)
124 public void testReadWithInvalidReplyMessageType() throws Throwable {
125 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
127 doReturn(Futures.successful(new Object())).when(mockActorContext)
128 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
130 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
133 transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
134 } catch (ExecutionException e) {
139 @Test(expected = TestException.class)
140 public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
141 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
143 doReturn(Futures.failed(new TestException())).when(mockActorContext)
144 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
146 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
148 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
151 private void testExceptionOnInitialCreateTransaction(final Exception exToThrow, final Invoker invoker)
153 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
155 if (exToThrow instanceof PrimaryNotFoundException) {
156 doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
158 doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
159 .findPrimaryShardAsync(anyString());
162 doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
163 any(ActorSelection.class), any(), any(Timeout.class));
165 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
167 propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
170 private void testReadWithExceptionOnInitialCreateTransaction(final Exception exToThrow) throws Throwable {
171 testExceptionOnInitialCreateTransaction(exToThrow, proxy -> proxy.read(TestModel.TEST_PATH));
174 @Test(expected = PrimaryNotFoundException.class)
175 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
176 testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
179 @Test(expected = TestException.class)
180 public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
181 testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
182 new TestException()));
185 @Test(expected = TestException.class)
186 public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
187 testReadWithExceptionOnInitialCreateTransaction(new TestException());
191 public void testReadWithPriorRecordingOperationSuccessful() throws Exception {
192 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
194 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
196 expectBatchedModifications(actorRef, 1);
198 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
199 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
201 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
203 transactionProxy.write(TestModel.TEST_PATH, expectedNode);
205 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
206 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
208 assertTrue("NormalizedNode isPresent", readOptional.isPresent());
209 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
211 InOrder inOrder = Mockito.inOrder(mockActorContext);
212 inOrder.verify(mockActorContext).executeOperationAsync(
213 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
215 inOrder.verify(mockActorContext).executeOperationAsync(
216 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
219 @Test(expected = IllegalStateException.class)
220 public void testReadPreConditionCheck() {
221 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
222 transactionProxy.read(TestModel.TEST_PATH);
225 @Test(expected = IllegalArgumentException.class)
226 public void testInvalidCreateTransactionReply() throws Throwable {
227 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
229 doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext)
230 .actorSelection(actorRef.path().toString());
232 doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
233 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
235 doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
236 eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
239 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
241 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
245 public void testExists() throws Exception {
246 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
248 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
250 doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
251 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
253 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
255 assertEquals("Exists response", Boolean.FALSE, exists);
257 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
258 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
260 exists = transactionProxy.exists(TestModel.TEST_PATH).get();
262 assertEquals("Exists response", Boolean.TRUE, exists);
265 @Test(expected = PrimaryNotFoundException.class)
266 public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
267 testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"),
268 proxy -> proxy.exists(TestModel.TEST_PATH));
271 @Test(expected = ReadFailedException.class)
272 public void testExistsWithInvalidReplyMessageType() throws Throwable {
273 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
275 doReturn(Futures.successful(new Object())).when(mockActorContext)
276 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
278 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
281 transactionProxy.exists(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
282 } catch (ExecutionException e) {
287 @Test(expected = TestException.class)
288 public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
289 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
291 doReturn(Futures.failed(new TestException())).when(mockActorContext)
292 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
294 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
296 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
300 public void testExistsWithPriorRecordingOperationSuccessful() throws Exception {
301 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
303 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
305 expectBatchedModifications(actorRef, 1);
307 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
308 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
310 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
312 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
314 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
316 assertEquals("Exists response", Boolean.TRUE, exists);
318 InOrder inOrder = Mockito.inOrder(mockActorContext);
319 inOrder.verify(mockActorContext).executeOperationAsync(
320 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
322 inOrder.verify(mockActorContext).executeOperationAsync(
323 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
326 @Test(expected = IllegalStateException.class)
327 public void testExistsPreConditionCheck() {
328 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
329 transactionProxy.exists(TestModel.TEST_PATH);
333 public void testWrite() {
334 dataStoreContextBuilder.shardBatchedModificationCount(1);
335 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
337 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
339 expectBatchedModifications(actorRef, 1);
341 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
343 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
345 verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
349 @SuppressWarnings("checkstyle:IllegalCatch")
350 public void testWriteAfterAsyncRead() throws Exception {
351 ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(),
352 DefaultShardStrategy.DEFAULT_SHARD);
354 Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
355 doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
356 eq(getSystem().actorSelection(actorRef.path())),
357 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
359 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
360 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
362 expectBatchedModificationsReady(actorRef);
364 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
366 final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
368 final CountDownLatch readComplete = new CountDownLatch(1);
369 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
370 com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
371 new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
373 public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
375 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
376 } catch (Exception e) {
379 readComplete.countDown();
384 public void onFailure(final Throwable failure) {
385 caughtEx.set(failure);
386 readComplete.countDown();
388 }, MoreExecutors.directExecutor());
390 createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
392 Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
394 final Throwable t = caughtEx.get();
396 Throwables.propagateIfPossible(t, Exception.class);
397 throw new RuntimeException(t);
400 // This sends the batched modification.
401 transactionProxy.ready();
403 verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
406 @Test(expected = IllegalStateException.class)
407 public void testWritePreConditionCheck() {
408 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
409 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
412 @Test(expected = IllegalStateException.class)
413 public void testWriteAfterReadyPreConditionCheck() {
414 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
416 transactionProxy.ready();
418 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
422 public void testMerge() {
423 dataStoreContextBuilder.shardBatchedModificationCount(1);
424 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
426 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
428 expectBatchedModifications(actorRef, 1);
430 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
432 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
434 verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
438 public void testDelete() {
439 dataStoreContextBuilder.shardBatchedModificationCount(1);
440 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
442 expectBatchedModifications(actorRef, 1);
444 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
446 transactionProxy.delete(TestModel.TEST_PATH);
448 verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
452 public void testReadWrite() {
453 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
455 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
457 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
458 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
460 expectBatchedModifications(actorRef, 1);
462 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
464 transactionProxy.read(TestModel.TEST_PATH);
466 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
468 transactionProxy.read(TestModel.TEST_PATH);
470 transactionProxy.read(TestModel.TEST_PATH);
472 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
473 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
475 verifyBatchedModifications(batchedModifications.get(0), false,
476 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
480 public void testReadyWithReadWrite() {
481 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
483 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
485 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
486 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
488 expectBatchedModificationsReady(actorRef, true);
490 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
492 transactionProxy.read(TestModel.TEST_PATH);
494 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
496 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
498 assertTrue(ready instanceof SingleCommitCohortProxy);
500 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
502 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
503 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
505 verifyBatchedModifications(batchedModifications.get(0), true, true,
506 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
508 assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
512 public void testReadyWithNoModifications() {
513 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
515 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
516 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
518 expectBatchedModificationsReady(actorRef, true);
520 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
522 transactionProxy.read(TestModel.TEST_PATH);
524 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
526 assertTrue(ready instanceof SingleCommitCohortProxy);
528 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
530 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
531 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
533 verifyBatchedModifications(batchedModifications.get(0), true, true);
537 public void testReadyWithMultipleShardWrites() {
538 ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
540 ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
541 TestModel.JUNK_QNAME.getLocalName());
543 expectBatchedModificationsReady(actorRef1);
544 expectBatchedModificationsReady(actorRef2);
546 ActorRef actorRef3 = getSystem().actorOf(Props.create(DoNothingActor.class));
548 doReturn(getSystem().actorSelection(actorRef3.path())).when(mockActorContext)
549 .actorSelection(actorRef3.path().toString());
551 doReturn(Futures.successful(newPrimaryShardInfo(actorRef3, createDataTree()))).when(mockActorContext)
552 .findPrimaryShardAsync(eq(CarsModel.BASE_QNAME.getLocalName()));
554 expectReadyLocalTransaction(actorRef3, false);
556 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
558 transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
559 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
560 transactionProxy.write(CarsModel.BASE_PATH, ImmutableNodes.containerNode(CarsModel.BASE_QNAME));
562 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
564 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
566 verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
567 actorSelection(actorRef2), actorSelection(actorRef3));
569 SortedSet<String> expShardNames =
570 ImmutableSortedSet.of(DefaultShardStrategy.DEFAULT_SHARD,
571 TestModel.JUNK_QNAME.getLocalName(), CarsModel.BASE_QNAME.getLocalName());
573 ArgumentCaptor<BatchedModifications> batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
574 verify(mockActorContext).executeOperationAsync(
575 eq(actorSelection(actorRef1)), batchedMods.capture(), any(Timeout.class));
576 assertTrue("Participating shards present", batchedMods.getValue().getParticipatingShardNames().isPresent());
577 assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
579 batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
580 verify(mockActorContext).executeOperationAsync(
581 eq(actorSelection(actorRef2)), batchedMods.capture(), any(Timeout.class));
582 assertTrue("Participating shards present", batchedMods.getValue().getParticipatingShardNames().isPresent());
583 assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
585 ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
586 verify(mockActorContext).executeOperationAsync(
587 eq(actorSelection(actorRef3)), readyLocalTx.capture(), any(Timeout.class));
588 assertTrue("Participating shards present", readyLocalTx.getValue().getParticipatingShardNames().isPresent());
589 assertEquals("Participating shards", expShardNames, readyLocalTx.getValue().getParticipatingShardNames().get());
593 public void testReadyWithWriteOnlyAndLastBatchPending() {
594 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
596 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
598 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
600 expectBatchedModificationsReady(actorRef, true);
602 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
604 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
606 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
608 assertTrue(ready instanceof SingleCommitCohortProxy);
610 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
612 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
613 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
615 verifyBatchedModifications(batchedModifications.get(0), true, true,
616 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
620 public void testReadyWithWriteOnlyAndLastBatchEmpty() {
621 dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
622 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
624 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
626 expectBatchedModificationsReady(actorRef, true);
628 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
630 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
632 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
634 assertTrue(ready instanceof SingleCommitCohortProxy);
636 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
638 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
639 assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
641 verifyBatchedModifications(batchedModifications.get(0), false,
642 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
644 verifyBatchedModifications(batchedModifications.get(1), true, true);
648 public void testReadyWithReplyFailure() {
649 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
651 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
653 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
655 expectFailedBatchedModifications(actorRef);
657 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
659 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
661 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
663 assertTrue(ready instanceof SingleCommitCohortProxy);
665 verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
669 public void testReadyWithDebugContextEnabled() {
670 dataStoreContextBuilder.transactionDebugContextEnabled(true);
672 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
674 expectBatchedModificationsReady(actorRef, true);
676 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
678 transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
680 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
682 assertTrue(ready instanceof DebugThreePhaseCommitCohort);
684 verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
688 public void testReadyWithLocalTransaction() {
689 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
691 doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
692 .actorSelection(shardActorRef.path().toString());
694 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).when(mockActorContext)
695 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
697 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
699 expectReadyLocalTransaction(shardActorRef, true);
701 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
702 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
704 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
705 assertTrue(ready instanceof SingleCommitCohortProxy);
706 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
708 ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
709 verify(mockActorContext).executeOperationAsync(
710 eq(actorSelection(shardActorRef)), readyLocalTx.capture(), any(Timeout.class));
711 assertFalse("Participating shards present", readyLocalTx.getValue().getParticipatingShardNames().isPresent());
715 public void testReadyWithLocalTransactionWithFailure() {
716 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
718 doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
719 .actorSelection(shardActorRef.path().toString());
721 DataTree mockDataTree = createDataTree();
722 DataTreeModification mockModification = mockDataTree.takeSnapshot().newModification();
723 doThrow(new RuntimeException("mock")).when(mockModification).ready();
725 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).when(mockActorContext)
726 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
728 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
730 expectReadyLocalTransaction(shardActorRef, true);
732 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
733 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
735 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
736 assertTrue(ready instanceof SingleCommitCohortProxy);
737 verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
740 private void testWriteOnlyTxWithFindPrimaryShardFailure(final Exception toThrow) {
741 doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
743 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
745 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
747 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
749 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
751 transactionProxy.delete(TestModel.TEST_PATH);
753 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
755 assertTrue(ready instanceof SingleCommitCohortProxy);
757 verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
761 public void testWriteOnlyTxWithPrimaryNotFoundException() {
762 testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
766 public void testWriteOnlyTxWithNotInitializedException() {
767 testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
771 public void testWriteOnlyTxWithNoShardLeaderException() {
772 testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
776 public void testReadyWithInvalidReplyMessageType() {
777 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
778 ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
780 ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
781 TestModel.JUNK_QNAME.getLocalName());
783 doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
784 eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
786 expectBatchedModificationsReady(actorRef2);
788 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
790 transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
791 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
793 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
795 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
797 verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
798 IllegalArgumentException.class);
802 public void testGetIdentifier() {
803 setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
804 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
806 Object id = transactionProxy.getIdentifier();
807 assertNotNull("getIdentifier returned null", id);
808 assertTrue("Invalid identifier: " + id, id.toString().contains(memberName));
812 public void testClose() {
813 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
815 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
816 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
818 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
820 transactionProxy.read(TestModel.TEST_PATH);
822 transactionProxy.close();
824 verify(mockActorContext).sendOperationAsync(
825 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
828 private interface TransactionProxyOperation {
829 void run(TransactionProxy transactionProxy);
832 private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef) {
833 return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
836 private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef, final DataTree dataTree) {
837 return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
841 private void throttleOperation(final TransactionProxyOperation operation) {
842 throttleOperation(operation, 1, true);
845 private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
846 final boolean shardFound) {
847 throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
848 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
851 private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
852 final boolean shardFound, final long expectedCompletionTime) {
853 ActorSystem actorSystem = getSystem();
854 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
856 // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
857 // we now allow one extra permit to be allowed for ready
858 doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2)
859 .shardBatchedModificationCount(outstandingOpsLimit - 1).build()).when(mockActorContext)
860 .getDatastoreContext();
862 doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
863 .actorSelection(shardActorRef.path().toString());
866 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
867 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
868 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
869 .findPrimaryShardAsync(eq("cars"));
872 doReturn(Futures.failed(new Exception("not found")))
873 .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
876 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
877 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
880 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
882 long start = System.nanoTime();
884 operation.run(transactionProxy);
886 long end = System.nanoTime();
888 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
889 expectedCompletionTime, end - start),
890 end - start > expectedCompletionTime && end - start < expectedCompletionTime * 2);
894 private void completeOperation(final TransactionProxyOperation operation) {
895 completeOperation(operation, true);
898 private void completeOperation(final TransactionProxyOperation operation, final boolean shardFound) {
899 ActorSystem actorSystem = getSystem();
900 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
902 doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
903 .actorSelection(shardActorRef.path().toString());
906 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
907 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
909 doReturn(Futures.failed(new PrimaryNotFoundException("test"))).when(mockActorContext)
910 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
913 ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
914 String actorPath = txActorRef.path().toString();
915 CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, nextTransactionId(),
916 DataStoreVersions.CURRENT_VERSION);
918 doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
920 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).executeOperationAsync(
921 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
924 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
926 long start = System.nanoTime();
928 operation.run(transactionProxy);
930 long end = System.nanoTime();
932 long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
933 .getOperationTimeoutInMillis());
934 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
935 expected, end - start), end - start <= expected);
938 private void completeOperationLocal(final TransactionProxyOperation operation, final DataTree dataTree) {
939 ActorSystem actorSystem = getSystem();
940 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
942 doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
943 .actorSelection(shardActorRef.path().toString());
945 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTree))).when(mockActorContext)
946 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
948 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
950 long start = System.nanoTime();
952 operation.run(transactionProxy);
954 long end = System.nanoTime();
956 long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
957 .getOperationTimeoutInMillis());
958 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", expected, end - start),
959 end - start <= expected);
962 private static DataTree createDataTree() {
963 DataTree dataTree = mock(DataTree.class);
964 DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
965 DataTreeModification dataTreeModification = mock(DataTreeModification.class);
967 doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
968 doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
973 private static DataTree createDataTree(final NormalizedNode<?, ?> readResponse) {
974 DataTree dataTree = mock(DataTree.class);
975 DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
976 DataTreeModification dataTreeModification = mock(DataTreeModification.class);
978 doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
979 doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
980 doReturn(java.util.Optional.of(readResponse)).when(dataTreeModification).readNode(
981 any(YangInstanceIdentifier.class));
988 public void testWriteCompletionForLocalShard() {
989 completeOperationLocal(transactionProxy -> {
990 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
992 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
994 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
996 }, createDataTree());
1000 public void testWriteThrottlingWhenShardFound() {
1001 throttleOperation(transactionProxy -> {
1002 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1004 expectIncompleteBatchedModifications();
1006 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1008 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1013 public void testWriteThrottlingWhenShardNotFound() {
1014 // Confirm that there is no throttling when the Shard is not found
1015 completeOperation(transactionProxy -> {
1016 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1018 expectBatchedModifications(2);
1020 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1022 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1029 public void testWriteCompletion() {
1030 completeOperation(transactionProxy -> {
1031 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1033 expectBatchedModifications(2);
1035 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1037 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1042 public void testMergeThrottlingWhenShardFound() {
1043 throttleOperation(transactionProxy -> {
1044 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1046 expectIncompleteBatchedModifications();
1048 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1050 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1055 public void testMergeThrottlingWhenShardNotFound() {
1056 completeOperation(transactionProxy -> {
1057 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1059 expectBatchedModifications(2);
1061 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1063 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1068 public void testMergeCompletion() {
1069 completeOperation(transactionProxy -> {
1070 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1072 expectBatchedModifications(2);
1074 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1076 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1082 public void testMergeCompletionForLocalShard() {
1083 completeOperationLocal(transactionProxy -> {
1084 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1086 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1088 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1090 }, createDataTree());
1095 public void testDeleteThrottlingWhenShardFound() {
1097 throttleOperation(transactionProxy -> {
1098 expectIncompleteBatchedModifications();
1100 transactionProxy.delete(TestModel.TEST_PATH);
1102 transactionProxy.delete(TestModel.TEST_PATH);
1108 public void testDeleteThrottlingWhenShardNotFound() {
1110 completeOperation(transactionProxy -> {
1111 expectBatchedModifications(2);
1113 transactionProxy.delete(TestModel.TEST_PATH);
1115 transactionProxy.delete(TestModel.TEST_PATH);
1120 public void testDeleteCompletionForLocalShard() {
1121 completeOperationLocal(transactionProxy -> {
1123 transactionProxy.delete(TestModel.TEST_PATH);
1125 transactionProxy.delete(TestModel.TEST_PATH);
1126 }, createDataTree());
1131 public void testDeleteCompletion() {
1132 completeOperation(transactionProxy -> {
1133 expectBatchedModifications(2);
1135 transactionProxy.delete(TestModel.TEST_PATH);
1137 transactionProxy.delete(TestModel.TEST_PATH);
1143 public void testReadThrottlingWhenShardFound() {
1145 throttleOperation(transactionProxy -> {
1146 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1147 any(ActorSelection.class), eqReadData());
1149 transactionProxy.read(TestModel.TEST_PATH);
1151 transactionProxy.read(TestModel.TEST_PATH);
1156 public void testReadThrottlingWhenShardNotFound() {
1158 completeOperation(transactionProxy -> {
1159 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1160 any(ActorSelection.class), eqReadData());
1162 transactionProxy.read(TestModel.TEST_PATH);
1164 transactionProxy.read(TestModel.TEST_PATH);
1170 public void testReadCompletion() {
1171 completeOperation(transactionProxy -> {
1172 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1174 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1175 any(ActorSelection.class), eqReadData(), any(Timeout.class));
1177 transactionProxy.read(TestModel.TEST_PATH);
1179 transactionProxy.read(TestModel.TEST_PATH);
1185 public void testReadCompletionForLocalShard() {
1186 final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1187 completeOperationLocal(transactionProxy -> {
1188 transactionProxy.read(TestModel.TEST_PATH);
1190 transactionProxy.read(TestModel.TEST_PATH);
1191 }, createDataTree(nodeToRead));
1196 public void testReadCompletionForLocalShardWhenExceptionOccurs() {
1197 completeOperationLocal(transactionProxy -> {
1198 transactionProxy.read(TestModel.TEST_PATH);
1200 transactionProxy.read(TestModel.TEST_PATH);
1201 }, createDataTree());
1206 public void testExistsThrottlingWhenShardFound() {
1208 throttleOperation(transactionProxy -> {
1209 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1210 any(ActorSelection.class), eqDataExists());
1212 transactionProxy.exists(TestModel.TEST_PATH);
1214 transactionProxy.exists(TestModel.TEST_PATH);
1219 public void testExistsThrottlingWhenShardNotFound() {
1221 completeOperation(transactionProxy -> {
1222 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1223 any(ActorSelection.class), eqDataExists());
1225 transactionProxy.exists(TestModel.TEST_PATH);
1227 transactionProxy.exists(TestModel.TEST_PATH);
1233 public void testExistsCompletion() {
1234 completeOperation(transactionProxy -> {
1235 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1236 any(ActorSelection.class), eqDataExists(), any(Timeout.class));
1238 transactionProxy.exists(TestModel.TEST_PATH);
1240 transactionProxy.exists(TestModel.TEST_PATH);
1246 public void testExistsCompletionForLocalShard() {
1247 final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1248 completeOperationLocal(transactionProxy -> {
1249 transactionProxy.exists(TestModel.TEST_PATH);
1251 transactionProxy.exists(TestModel.TEST_PATH);
1252 }, createDataTree(nodeToRead));
1257 public void testExistsCompletionForLocalShardWhenExceptionOccurs() {
1258 completeOperationLocal(transactionProxy -> {
1259 transactionProxy.exists(TestModel.TEST_PATH);
1261 transactionProxy.exists(TestModel.TEST_PATH);
1262 }, createDataTree());
1267 public void testReadyThrottling() {
1269 throttleOperation(transactionProxy -> {
1270 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1272 expectBatchedModifications(1);
1274 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1276 transactionProxy.ready();
1281 public void testReadyThrottlingWithTwoTransactionContexts() {
1282 throttleOperation(transactionProxy -> {
1283 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1284 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1286 expectBatchedModifications(2);
1288 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1290 // Trying to write to Cars will cause another transaction context to get created
1291 transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1293 // Now ready should block for both transaction contexts
1294 transactionProxy.ready();
1295 }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
1296 .getOperationTimeoutInMillis()) * 2);
1299 private void testModificationOperationBatching(final TransactionType type) {
1300 int shardBatchedModificationCount = 3;
1301 dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1303 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1305 expectBatchedModifications(actorRef, shardBatchedModificationCount);
1307 YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1308 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1310 YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1311 NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1313 YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1314 NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1316 YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1317 NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1319 YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1320 NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1322 YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1323 NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1325 YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1326 YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1328 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1330 transactionProxy.write(writePath1, writeNode1);
1331 transactionProxy.write(writePath2, writeNode2);
1332 transactionProxy.delete(deletePath1);
1333 transactionProxy.merge(mergePath1, mergeNode1);
1334 transactionProxy.merge(mergePath2, mergeNode2);
1335 transactionProxy.write(writePath3, writeNode3);
1336 transactionProxy.merge(mergePath3, mergeNode3);
1337 transactionProxy.delete(deletePath2);
1339 // This sends the last batch.
1340 transactionProxy.ready();
1342 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1343 assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1345 verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1346 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1348 verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1349 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1351 verifyBatchedModifications(batchedModifications.get(2), true, true,
1352 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1354 assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1358 public void testReadWriteModificationOperationBatching() {
1359 testModificationOperationBatching(READ_WRITE);
1363 public void testWriteOnlyModificationOperationBatching() {
1364 testModificationOperationBatching(WRITE_ONLY);
1368 public void testOptimizedWriteOnlyModificationOperationBatching() {
1369 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1370 testModificationOperationBatching(WRITE_ONLY);
1374 public void testModificationOperationBatchingWithInterleavedReads() throws Exception {
1376 int shardBatchedModificationCount = 10;
1377 dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1379 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1381 expectBatchedModifications(actorRef, shardBatchedModificationCount);
1383 final YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1384 final NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1386 YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1387 NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1389 final YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1390 final NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1392 YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1393 NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1395 final YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1397 doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1398 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1400 doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1401 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1403 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1404 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1406 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1408 transactionProxy.write(writePath1, writeNode1);
1409 transactionProxy.write(writePath2, writeNode2);
1411 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).get(5, TimeUnit.SECONDS);
1413 assertTrue("NormalizedNode isPresent", readOptional.isPresent());
1414 assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1416 transactionProxy.merge(mergePath1, mergeNode1);
1417 transactionProxy.merge(mergePath2, mergeNode2);
1419 readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1421 transactionProxy.delete(deletePath);
1423 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
1424 assertEquals("Exists response", Boolean.TRUE, exists);
1426 assertTrue("NormalizedNode isPresent", readOptional.isPresent());
1427 assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1429 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1430 assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1432 verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1433 new WriteModification(writePath2, writeNode2));
1435 verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1436 new MergeModification(mergePath2, mergeNode2));
1438 verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1440 InOrder inOrder = Mockito.inOrder(mockActorContext);
1441 inOrder.verify(mockActorContext).executeOperationAsync(
1442 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1444 inOrder.verify(mockActorContext).executeOperationAsync(
1445 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1447 inOrder.verify(mockActorContext).executeOperationAsync(
1448 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1450 inOrder.verify(mockActorContext).executeOperationAsync(
1451 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1453 inOrder.verify(mockActorContext).executeOperationAsync(
1454 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1456 inOrder.verify(mockActorContext).executeOperationAsync(
1457 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1461 public void testReadRoot() throws InterruptedException, ExecutionException,
1462 java.util.concurrent.TimeoutException {
1463 SchemaContext schemaContext = SchemaContextHelper.full();
1464 Configuration configuration = mock(Configuration.class);
1465 doReturn(configuration).when(mockActorContext).getConfiguration();
1466 doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1467 doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1469 NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1470 NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1472 setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1473 setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1475 doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
1477 doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1479 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1481 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1482 YangInstanceIdentifier.EMPTY).get(5, TimeUnit.SECONDS);
1484 assertTrue("NormalizedNode isPresent", readOptional.isPresent());
1486 NormalizedNode<?, ?> normalizedNode = readOptional.get();
1488 assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1490 @SuppressWarnings("unchecked")
1491 Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1493 for (NormalizedNode<?,?> node : collection) {
1494 assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1497 assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1498 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1500 assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1502 assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1503 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1505 assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1509 private void setUpReadData(final String shardName, final NormalizedNode<?, ?> expectedNode) {
1510 ActorSystem actorSystem = getSystem();
1511 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1513 doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
1514 .actorSelection(shardActorRef.path().toString());
1516 doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).when(mockActorContext)
1517 .findPrimaryShardAsync(eq(shardName));
1519 ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1521 doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext)
1522 .actorSelection(txActorRef.path().toString());
1524 doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION)))
1525 .when(mockActorContext).executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1526 eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1528 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1529 eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.EMPTY), any(Timeout.class));