2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertThrows;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.ArgumentMatchers.any;
16 import static org.mockito.ArgumentMatchers.anyString;
17 import static org.mockito.ArgumentMatchers.eq;
18 import static org.mockito.ArgumentMatchers.isA;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.doThrow;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.verify;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
25 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
27 import akka.actor.ActorRef;
28 import akka.actor.ActorSelection;
29 import akka.actor.ActorSystem;
30 import akka.actor.Props;
31 import akka.dispatch.Futures;
32 import akka.util.Timeout;
33 import com.google.common.base.Throwables;
34 import com.google.common.collect.ImmutableSortedSet;
35 import com.google.common.collect.Sets;
36 import com.google.common.util.concurrent.FluentFuture;
37 import com.google.common.util.concurrent.FutureCallback;
38 import com.google.common.util.concurrent.MoreExecutors;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import java.util.Collection;
41 import java.util.List;
42 import java.util.Optional;
43 import java.util.SortedSet;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.ExecutionException;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicReference;
48 import org.junit.Assert;
49 import org.junit.Test;
50 import org.mockito.ArgumentCaptor;
51 import org.mockito.InOrder;
52 import org.mockito.Mockito;
53 import org.opendaylight.controller.cluster.access.concepts.MemberName;
54 import org.opendaylight.controller.cluster.datastore.config.Configuration;
55 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
56 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
57 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
58 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
59 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
60 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
63 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
64 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
65 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
66 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
67 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
68 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
69 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
70 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
71 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
72 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
73 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
74 import org.opendaylight.mdsal.common.api.ReadFailedException;
75 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
76 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
77 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
78 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
79 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
80 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
81 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
82 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
83 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
84 import scala.concurrent.Promise;
86 @SuppressWarnings({"resource", "checkstyle:IllegalThrows", "checkstyle:AvoidHidingCauseException"})
87 public class TransactionProxyTest extends AbstractTransactionProxyTest {
89 @SuppressWarnings("serial")
90 static class TestException extends RuntimeException {
94 FluentFuture<?> invoke(TransactionProxy proxy);
98 public void testRead() throws Exception {
99 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
101 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
103 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
104 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
106 assertEquals(Optional.empty(), transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS));
108 NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
110 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
111 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
113 assertEquals(Optional.of(expectedNode), transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS));
116 @Test(expected = ReadFailedException.class)
117 public void testReadWithInvalidReplyMessageType() throws Throwable {
118 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
120 doReturn(Futures.successful(new Object())).when(mockActorContext)
121 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
123 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
126 transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
127 } catch (ExecutionException e) {
132 @Test(expected = TestException.class)
133 public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
134 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
136 doReturn(Futures.failed(new TestException())).when(mockActorContext)
137 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
139 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
141 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
144 private void testExceptionOnInitialCreateTransaction(final Exception exToThrow, final Invoker invoker)
146 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
148 if (exToThrow instanceof PrimaryNotFoundException) {
149 doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
151 doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
152 .findPrimaryShardAsync(anyString());
155 doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
156 any(ActorSelection.class), any(), any(Timeout.class));
158 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
160 propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
163 private void testReadWithExceptionOnInitialCreateTransaction(final Exception exToThrow) throws Throwable {
164 testExceptionOnInitialCreateTransaction(exToThrow, proxy -> proxy.read(TestModel.TEST_PATH));
167 @Test(expected = PrimaryNotFoundException.class)
168 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
169 testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
172 @Test(expected = TestException.class)
173 public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
174 testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
175 new TestException()));
178 @Test(expected = TestException.class)
179 public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
180 testReadWithExceptionOnInitialCreateTransaction(new TestException());
184 public void testReadWithPriorRecordingOperationSuccessful() throws Exception {
185 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
187 NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
189 expectBatchedModifications(actorRef, 1);
191 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
192 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
194 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
196 transactionProxy.write(TestModel.TEST_PATH, expectedNode);
198 assertEquals(Optional.of(expectedNode), transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS));
200 InOrder inOrder = Mockito.inOrder(mockActorContext);
201 inOrder.verify(mockActorContext).executeOperationAsync(
202 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
204 inOrder.verify(mockActorContext).executeOperationAsync(
205 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
209 public void testReadPreConditionCheck() {
210 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
211 assertThrows(IllegalStateException.class, () -> transactionProxy.read(TestModel.TEST_PATH));
214 @Test(expected = IllegalArgumentException.class)
215 public void testInvalidCreateTransactionReply() throws Throwable {
216 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
218 doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext)
219 .actorSelection(actorRef.path().toString());
221 doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
222 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
224 doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
225 eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
228 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
230 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
234 public void testExists() throws Exception {
235 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
237 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
239 doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
240 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
242 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
244 assertEquals("Exists response", Boolean.FALSE, exists);
246 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
247 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
249 exists = transactionProxy.exists(TestModel.TEST_PATH).get();
251 assertEquals("Exists response", Boolean.TRUE, exists);
254 @Test(expected = PrimaryNotFoundException.class)
255 public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
256 testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"),
257 proxy -> proxy.exists(TestModel.TEST_PATH));
260 @Test(expected = ReadFailedException.class)
261 public void testExistsWithInvalidReplyMessageType() throws Throwable {
262 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
264 doReturn(Futures.successful(new Object())).when(mockActorContext)
265 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
267 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
270 transactionProxy.exists(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
271 } catch (ExecutionException e) {
276 @Test(expected = TestException.class)
277 public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
278 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
280 doReturn(Futures.failed(new TestException())).when(mockActorContext)
281 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
283 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
285 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
289 public void testExistsWithPriorRecordingOperationSuccessful() throws Exception {
290 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
292 NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
294 expectBatchedModifications(actorRef, 1);
296 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
297 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
299 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
301 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
303 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
305 assertEquals("Exists response", Boolean.TRUE, exists);
307 InOrder inOrder = Mockito.inOrder(mockActorContext);
308 inOrder.verify(mockActorContext).executeOperationAsync(
309 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
311 inOrder.verify(mockActorContext).executeOperationAsync(
312 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
315 @Test(expected = IllegalStateException.class)
316 public void testExistsPreConditionCheck() {
317 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
318 transactionProxy.exists(TestModel.TEST_PATH);
322 public void testWrite() {
323 dataStoreContextBuilder.shardBatchedModificationCount(1);
324 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
326 NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
328 expectBatchedModifications(actorRef, 1);
330 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
332 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
334 verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
338 @SuppressWarnings("checkstyle:IllegalCatch")
339 public void testWriteAfterAsyncRead() throws Exception {
340 ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(),
341 DefaultShardStrategy.DEFAULT_SHARD);
343 Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
344 doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
345 eq(getSystem().actorSelection(actorRef.path())),
346 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
348 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
349 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
351 expectBatchedModificationsReady(actorRef);
353 final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
355 final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
357 final CountDownLatch readComplete = new CountDownLatch(1);
358 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
359 com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
360 new FutureCallback<Optional<NormalizedNode>>() {
362 public void onSuccess(final Optional<NormalizedNode> result) {
364 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
365 } catch (Exception e) {
368 readComplete.countDown();
373 public void onFailure(final Throwable failure) {
374 caughtEx.set(failure);
375 readComplete.countDown();
377 }, MoreExecutors.directExecutor());
379 createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
381 Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
383 final Throwable t = caughtEx.get();
385 Throwables.propagateIfPossible(t, Exception.class);
386 throw new RuntimeException(t);
389 // This sends the batched modification.
390 transactionProxy.ready();
392 verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
395 @Test(expected = IllegalStateException.class)
396 public void testWritePreConditionCheck() {
397 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
398 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
401 @Test(expected = IllegalStateException.class)
402 public void testWriteAfterReadyPreConditionCheck() {
403 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
405 transactionProxy.ready();
407 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
411 public void testMerge() {
412 dataStoreContextBuilder.shardBatchedModificationCount(1);
413 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
415 NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
417 expectBatchedModifications(actorRef, 1);
419 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
421 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
423 verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
427 public void testDelete() {
428 dataStoreContextBuilder.shardBatchedModificationCount(1);
429 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
431 expectBatchedModifications(actorRef, 1);
433 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
435 transactionProxy.delete(TestModel.TEST_PATH);
437 verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
441 public void testReadWrite() {
442 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
444 final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
446 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
447 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
449 expectBatchedModifications(actorRef, 1);
451 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
453 transactionProxy.read(TestModel.TEST_PATH);
455 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
457 transactionProxy.read(TestModel.TEST_PATH);
459 transactionProxy.read(TestModel.TEST_PATH);
461 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
462 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
464 verifyBatchedModifications(batchedModifications.get(0), false,
465 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
469 public void testReadyWithReadWrite() {
470 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
472 final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
474 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
475 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
477 expectBatchedModificationsReady(actorRef, true);
479 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
481 transactionProxy.read(TestModel.TEST_PATH);
483 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
485 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
487 assertTrue(ready instanceof SingleCommitCohortProxy);
489 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
491 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
492 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
494 verifyBatchedModifications(batchedModifications.get(0), true, true,
495 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
497 assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
501 public void testReadyWithNoModifications() {
502 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
504 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
505 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
507 expectBatchedModificationsReady(actorRef, true);
509 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
511 transactionProxy.read(TestModel.TEST_PATH);
513 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
515 assertTrue(ready instanceof SingleCommitCohortProxy);
517 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
519 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
520 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
522 verifyBatchedModifications(batchedModifications.get(0), true, true);
526 public void testReadyWithMultipleShardWrites() {
527 ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
529 ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
530 TestModel.JUNK_QNAME.getLocalName());
532 expectBatchedModificationsReady(actorRef1);
533 expectBatchedModificationsReady(actorRef2);
535 ActorRef actorRef3 = getSystem().actorOf(Props.create(DoNothingActor.class));
537 doReturn(getSystem().actorSelection(actorRef3.path())).when(mockActorContext)
538 .actorSelection(actorRef3.path().toString());
540 doReturn(Futures.successful(newPrimaryShardInfo(actorRef3, createDataTree()))).when(mockActorContext)
541 .findPrimaryShardAsync(eq(CarsModel.BASE_QNAME.getLocalName()));
543 expectReadyLocalTransaction(actorRef3, false);
545 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
547 transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
548 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
549 transactionProxy.write(CarsModel.BASE_PATH, ImmutableNodes.containerNode(CarsModel.BASE_QNAME));
551 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
553 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
555 verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
556 actorSelection(actorRef2), actorSelection(actorRef3));
558 SortedSet<String> expShardNames =
559 ImmutableSortedSet.of(DefaultShardStrategy.DEFAULT_SHARD,
560 TestModel.JUNK_QNAME.getLocalName(), CarsModel.BASE_QNAME.getLocalName());
562 ArgumentCaptor<BatchedModifications> batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
563 verify(mockActorContext).executeOperationAsync(
564 eq(actorSelection(actorRef1)), batchedMods.capture(), any(Timeout.class));
565 assertEquals("Participating shards", Optional.of(expShardNames),
566 batchedMods.getValue().getParticipatingShardNames());
568 batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
569 verify(mockActorContext).executeOperationAsync(
570 eq(actorSelection(actorRef2)), batchedMods.capture(), any(Timeout.class));
571 assertEquals("Participating shards", Optional.of(expShardNames),
572 batchedMods.getValue().getParticipatingShardNames());
574 ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
575 verify(mockActorContext).executeOperationAsync(
576 eq(actorSelection(actorRef3)), readyLocalTx.capture(), any(Timeout.class));
577 assertEquals("Participating shards", Optional.of(expShardNames),
578 readyLocalTx.getValue().getParticipatingShardNames());
582 public void testReadyWithWriteOnlyAndLastBatchPending() {
583 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
585 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
587 NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
589 expectBatchedModificationsReady(actorRef, true);
591 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
593 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
595 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
597 assertTrue(ready instanceof SingleCommitCohortProxy);
599 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
601 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
602 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
604 verifyBatchedModifications(batchedModifications.get(0), true, true,
605 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
609 public void testReadyWithWriteOnlyAndLastBatchEmpty() {
610 dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
611 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
613 NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
615 expectBatchedModificationsReady(actorRef, true);
617 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
619 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
621 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
623 assertTrue(ready instanceof SingleCommitCohortProxy);
625 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
627 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
628 assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
630 verifyBatchedModifications(batchedModifications.get(0), false,
631 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
633 verifyBatchedModifications(batchedModifications.get(1), true, true);
637 public void testReadyWithReplyFailure() {
638 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
640 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
642 NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
644 expectFailedBatchedModifications(actorRef);
646 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
648 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
650 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
652 assertTrue(ready instanceof SingleCommitCohortProxy);
654 verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
658 public void testReadyWithDebugContextEnabled() {
659 dataStoreContextBuilder.transactionDebugContextEnabled(true);
661 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
663 expectBatchedModificationsReady(actorRef, true);
665 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
667 transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
669 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
671 assertTrue(ready instanceof DebugThreePhaseCommitCohort);
673 verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
677 public void testReadyWithLocalTransaction() {
678 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
680 doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
681 .actorSelection(shardActorRef.path().toString());
683 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).when(mockActorContext)
684 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
686 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
688 expectReadyLocalTransaction(shardActorRef, true);
690 NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
691 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
693 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
694 assertTrue(ready instanceof SingleCommitCohortProxy);
695 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
697 ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
698 verify(mockActorContext).executeOperationAsync(
699 eq(actorSelection(shardActorRef)), readyLocalTx.capture(), any(Timeout.class));
700 assertFalse("Participating shards present", readyLocalTx.getValue().getParticipatingShardNames().isPresent());
704 public void testReadyWithLocalTransactionWithFailure() {
705 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
707 doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
708 .actorSelection(shardActorRef.path().toString());
710 DataTree mockDataTree = createDataTree();
711 DataTreeModification mockModification = mockDataTree.takeSnapshot().newModification();
712 doThrow(new RuntimeException("mock")).when(mockModification).ready();
714 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).when(mockActorContext)
715 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
717 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
719 expectReadyLocalTransaction(shardActorRef, true);
721 NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
722 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
724 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
725 assertTrue(ready instanceof SingleCommitCohortProxy);
726 verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
729 private void testWriteOnlyTxWithFindPrimaryShardFailure(final Exception toThrow) {
730 doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
732 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
734 NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
736 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
738 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
740 transactionProxy.delete(TestModel.TEST_PATH);
742 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
744 assertTrue(ready instanceof SingleCommitCohortProxy);
746 verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
750 public void testWriteOnlyTxWithPrimaryNotFoundException() {
751 testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
755 public void testWriteOnlyTxWithNotInitializedException() {
756 testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
760 public void testWriteOnlyTxWithNoShardLeaderException() {
761 testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
765 public void testReadyWithInvalidReplyMessageType() {
766 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
767 ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
769 ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
770 TestModel.JUNK_QNAME.getLocalName());
772 doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
773 eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
775 expectBatchedModificationsReady(actorRef2);
777 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
779 transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
780 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
782 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
784 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
786 verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
787 IllegalArgumentException.class);
791 public void testGetIdentifier() {
792 setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
793 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
795 Object id = transactionProxy.getIdentifier();
796 assertNotNull("getIdentifier returned null", id);
797 assertTrue("Invalid identifier: " + id, id.toString().contains(memberName));
801 public void testClose() {
802 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
804 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
805 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
807 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
809 transactionProxy.read(TestModel.TEST_PATH);
811 transactionProxy.close();
813 verify(mockActorContext).sendOperationAsync(
814 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
817 private interface TransactionProxyOperation {
818 void run(TransactionProxy transactionProxy);
821 private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef) {
822 return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
825 private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef, final DataTree dataTree) {
826 return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
830 private void throttleOperation(final TransactionProxyOperation operation) {
831 throttleOperation(operation, 1, true);
834 private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
835 final boolean shardFound) {
836 throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
837 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
840 private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
841 final boolean shardFound, final long expectedCompletionTime) {
842 ActorSystem actorSystem = getSystem();
843 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
845 // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
846 // we now allow one extra permit to be allowed for ready
847 doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2)
848 .shardBatchedModificationCount(outstandingOpsLimit - 1).build()).when(mockActorContext)
849 .getDatastoreContext();
851 doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
852 .actorSelection(shardActorRef.path().toString());
855 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
856 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
857 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
858 .findPrimaryShardAsync(eq("cars"));
861 doReturn(Futures.failed(new Exception("not found")))
862 .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
865 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
866 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
869 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
871 long start = System.nanoTime();
873 operation.run(transactionProxy);
875 long end = System.nanoTime();
877 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
878 expectedCompletionTime, end - start),
879 end - start > expectedCompletionTime && end - start < expectedCompletionTime * 2);
883 private void completeOperation(final TransactionProxyOperation operation) {
884 completeOperation(operation, true);
887 private void completeOperation(final TransactionProxyOperation operation, final boolean shardFound) {
888 ActorSystem actorSystem = getSystem();
889 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
891 doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
892 .actorSelection(shardActorRef.path().toString());
895 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
896 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
898 doReturn(Futures.failed(new PrimaryNotFoundException("test"))).when(mockActorContext)
899 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
902 ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
903 String actorPath = txActorRef.path().toString();
904 CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, nextTransactionId(),
905 DataStoreVersions.CURRENT_VERSION);
907 doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
909 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).executeOperationAsync(
910 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
913 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
915 long start = System.nanoTime();
917 operation.run(transactionProxy);
919 long end = System.nanoTime();
921 long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
922 .getOperationTimeoutInMillis());
923 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
924 expected, end - start), end - start <= expected);
927 private void completeOperationLocal(final TransactionProxyOperation operation, final DataTree dataTree) {
928 ActorSystem actorSystem = getSystem();
929 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
931 doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
932 .actorSelection(shardActorRef.path().toString());
934 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTree))).when(mockActorContext)
935 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
937 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
939 long start = System.nanoTime();
941 operation.run(transactionProxy);
943 long end = System.nanoTime();
945 long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
946 .getOperationTimeoutInMillis());
947 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", expected, end - start),
948 end - start <= expected);
951 private static DataTree createDataTree() {
952 DataTree dataTree = mock(DataTree.class);
953 DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
954 DataTreeModification dataTreeModification = mock(DataTreeModification.class);
956 doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
957 doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
962 private static DataTree createDataTree(final NormalizedNode readResponse) {
963 DataTree dataTree = mock(DataTree.class);
964 DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
965 DataTreeModification dataTreeModification = mock(DataTreeModification.class);
967 doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
968 doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
969 doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
976 public void testWriteCompletionForLocalShard() {
977 completeOperationLocal(transactionProxy -> {
978 NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
980 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
982 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
984 }, createDataTree());
988 public void testWriteThrottlingWhenShardFound() {
989 throttleOperation(transactionProxy -> {
990 NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
992 expectIncompleteBatchedModifications();
994 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
996 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1001 public void testWriteThrottlingWhenShardNotFound() {
1002 // Confirm that there is no throttling when the Shard is not found
1003 completeOperation(transactionProxy -> {
1004 NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1006 expectBatchedModifications(2);
1008 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1010 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1017 public void testWriteCompletion() {
1018 completeOperation(transactionProxy -> {
1019 NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1021 expectBatchedModifications(2);
1023 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1025 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1030 public void testMergeThrottlingWhenShardFound() {
1031 throttleOperation(transactionProxy -> {
1032 NormalizedNode nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1034 expectIncompleteBatchedModifications();
1036 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1038 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1043 public void testMergeThrottlingWhenShardNotFound() {
1044 completeOperation(transactionProxy -> {
1045 NormalizedNode nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1047 expectBatchedModifications(2);
1049 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1051 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1056 public void testMergeCompletion() {
1057 completeOperation(transactionProxy -> {
1058 NormalizedNode nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1060 expectBatchedModifications(2);
1062 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1064 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1070 public void testMergeCompletionForLocalShard() {
1071 completeOperationLocal(transactionProxy -> {
1072 NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1074 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1076 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1078 }, createDataTree());
1083 public void testDeleteThrottlingWhenShardFound() {
1085 throttleOperation(transactionProxy -> {
1086 expectIncompleteBatchedModifications();
1088 transactionProxy.delete(TestModel.TEST_PATH);
1090 transactionProxy.delete(TestModel.TEST_PATH);
1096 public void testDeleteThrottlingWhenShardNotFound() {
1098 completeOperation(transactionProxy -> {
1099 expectBatchedModifications(2);
1101 transactionProxy.delete(TestModel.TEST_PATH);
1103 transactionProxy.delete(TestModel.TEST_PATH);
1108 public void testDeleteCompletionForLocalShard() {
1109 completeOperationLocal(transactionProxy -> {
1111 transactionProxy.delete(TestModel.TEST_PATH);
1113 transactionProxy.delete(TestModel.TEST_PATH);
1114 }, createDataTree());
1119 public void testDeleteCompletion() {
1120 completeOperation(transactionProxy -> {
1121 expectBatchedModifications(2);
1123 transactionProxy.delete(TestModel.TEST_PATH);
1125 transactionProxy.delete(TestModel.TEST_PATH);
1131 public void testReadThrottlingWhenShardFound() {
1133 throttleOperation(transactionProxy -> {
1134 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1135 any(ActorSelection.class), eqReadData());
1137 transactionProxy.read(TestModel.TEST_PATH);
1139 transactionProxy.read(TestModel.TEST_PATH);
1144 public void testReadThrottlingWhenShardNotFound() {
1146 completeOperation(transactionProxy -> {
1147 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1148 any(ActorSelection.class), eqReadData());
1150 transactionProxy.read(TestModel.TEST_PATH);
1152 transactionProxy.read(TestModel.TEST_PATH);
1158 public void testReadCompletion() {
1159 completeOperation(transactionProxy -> {
1160 NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1162 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1163 any(ActorSelection.class), eqReadData(), any(Timeout.class));
1165 transactionProxy.read(TestModel.TEST_PATH);
1167 transactionProxy.read(TestModel.TEST_PATH);
1173 public void testReadCompletionForLocalShard() {
1174 final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1175 completeOperationLocal(transactionProxy -> {
1176 transactionProxy.read(TestModel.TEST_PATH);
1178 transactionProxy.read(TestModel.TEST_PATH);
1179 }, createDataTree(nodeToRead));
1184 public void testReadCompletionForLocalShardWhenExceptionOccurs() {
1185 completeOperationLocal(transactionProxy -> {
1186 transactionProxy.read(TestModel.TEST_PATH);
1188 transactionProxy.read(TestModel.TEST_PATH);
1189 }, createDataTree());
1194 public void testExistsThrottlingWhenShardFound() {
1196 throttleOperation(transactionProxy -> {
1197 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1198 any(ActorSelection.class), eqDataExists());
1200 transactionProxy.exists(TestModel.TEST_PATH);
1202 transactionProxy.exists(TestModel.TEST_PATH);
1207 public void testExistsThrottlingWhenShardNotFound() {
1209 completeOperation(transactionProxy -> {
1210 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1211 any(ActorSelection.class), eqDataExists());
1213 transactionProxy.exists(TestModel.TEST_PATH);
1215 transactionProxy.exists(TestModel.TEST_PATH);
1221 public void testExistsCompletion() {
1222 completeOperation(transactionProxy -> {
1223 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1224 any(ActorSelection.class), eqDataExists(), any(Timeout.class));
1226 transactionProxy.exists(TestModel.TEST_PATH);
1228 transactionProxy.exists(TestModel.TEST_PATH);
1234 public void testExistsCompletionForLocalShard() {
1235 final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1236 completeOperationLocal(transactionProxy -> {
1237 transactionProxy.exists(TestModel.TEST_PATH);
1239 transactionProxy.exists(TestModel.TEST_PATH);
1240 }, createDataTree(nodeToRead));
1245 public void testExistsCompletionForLocalShardWhenExceptionOccurs() {
1246 completeOperationLocal(transactionProxy -> {
1247 transactionProxy.exists(TestModel.TEST_PATH);
1249 transactionProxy.exists(TestModel.TEST_PATH);
1250 }, createDataTree());
1255 public void testReadyThrottling() {
1257 throttleOperation(transactionProxy -> {
1258 NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1260 expectBatchedModifications(1);
1262 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1264 transactionProxy.ready();
1269 public void testReadyThrottlingWithTwoTransactionContexts() {
1270 throttleOperation(transactionProxy -> {
1271 NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1272 NormalizedNode carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1274 expectBatchedModifications(2);
1276 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1278 // Trying to write to Cars will cause another transaction context to get created
1279 transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1281 // Now ready should block for both transaction contexts
1282 transactionProxy.ready();
1283 }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
1284 .getOperationTimeoutInMillis()) * 2);
1287 private void testModificationOperationBatching(final TransactionType type) {
1288 int shardBatchedModificationCount = 3;
1289 dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1291 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1293 expectBatchedModifications(actorRef, shardBatchedModificationCount);
1295 YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1296 NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1298 YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1299 NormalizedNode writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1301 YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1302 NormalizedNode writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1304 YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1305 NormalizedNode mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1307 YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1308 NormalizedNode mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1310 YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1311 NormalizedNode mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1313 YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1314 YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1316 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1318 transactionProxy.write(writePath1, writeNode1);
1319 transactionProxy.write(writePath2, writeNode2);
1320 transactionProxy.delete(deletePath1);
1321 transactionProxy.merge(mergePath1, mergeNode1);
1322 transactionProxy.merge(mergePath2, mergeNode2);
1323 transactionProxy.write(writePath3, writeNode3);
1324 transactionProxy.merge(mergePath3, mergeNode3);
1325 transactionProxy.delete(deletePath2);
1327 // This sends the last batch.
1328 transactionProxy.ready();
1330 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1331 assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1333 verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1334 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1336 verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1337 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1339 verifyBatchedModifications(batchedModifications.get(2), true, true,
1340 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1342 assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1346 public void testReadWriteModificationOperationBatching() {
1347 testModificationOperationBatching(READ_WRITE);
1351 public void testWriteOnlyModificationOperationBatching() {
1352 testModificationOperationBatching(WRITE_ONLY);
1356 public void testOptimizedWriteOnlyModificationOperationBatching() {
1357 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1358 testModificationOperationBatching(WRITE_ONLY);
1362 public void testModificationOperationBatchingWithInterleavedReads() throws Exception {
1364 int shardBatchedModificationCount = 10;
1365 dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1367 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1369 expectBatchedModifications(actorRef, shardBatchedModificationCount);
1371 final YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1372 final NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1374 YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1375 NormalizedNode writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1377 final YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1378 final NormalizedNode mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1380 YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1381 NormalizedNode mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1383 final YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1385 doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1386 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1388 doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1389 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1391 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1392 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1394 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1396 transactionProxy.write(writePath1, writeNode1);
1397 transactionProxy.write(writePath2, writeNode2);
1399 Optional<NormalizedNode> readOptional = transactionProxy.read(writePath2).get(5, TimeUnit.SECONDS);
1401 assertEquals("Response NormalizedNode", Optional.of(writeNode2), readOptional);
1403 transactionProxy.merge(mergePath1, mergeNode1);
1404 transactionProxy.merge(mergePath2, mergeNode2);
1406 readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1408 transactionProxy.delete(deletePath);
1410 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
1411 assertEquals("Exists response", Boolean.TRUE, exists);
1413 assertEquals("Response NormalizedNode", Optional.of(mergeNode2), readOptional);
1415 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1416 assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1418 verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1419 new WriteModification(writePath2, writeNode2));
1421 verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1422 new MergeModification(mergePath2, mergeNode2));
1424 verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1426 InOrder inOrder = Mockito.inOrder(mockActorContext);
1427 inOrder.verify(mockActorContext).executeOperationAsync(
1428 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1430 inOrder.verify(mockActorContext).executeOperationAsync(
1431 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1433 inOrder.verify(mockActorContext).executeOperationAsync(
1434 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1436 inOrder.verify(mockActorContext).executeOperationAsync(
1437 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1439 inOrder.verify(mockActorContext).executeOperationAsync(
1440 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1442 inOrder.verify(mockActorContext).executeOperationAsync(
1443 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1447 public void testReadRoot() throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
1448 EffectiveModelContext schemaContext = SchemaContextHelper.full();
1449 Configuration configuration = mock(Configuration.class);
1450 doReturn(configuration).when(mockActorContext).getConfiguration();
1451 doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1452 doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1454 NormalizedNode expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1455 NormalizedNode expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1457 setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1458 setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1460 doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
1462 doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1464 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1466 Optional<NormalizedNode> readOptional = transactionProxy.read(
1467 YangInstanceIdentifier.empty()).get(5, TimeUnit.SECONDS);
1469 assertTrue("NormalizedNode isPresent", readOptional.isPresent());
1471 NormalizedNode normalizedNode = readOptional.orElseThrow();
1473 assertTrue("Expect value to be a Collection", normalizedNode.body() instanceof Collection);
1475 @SuppressWarnings("unchecked")
1476 Collection<NormalizedNode> collection = (Collection<NormalizedNode>) normalizedNode.body();
1478 for (NormalizedNode node : collection) {
1479 assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1482 assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1483 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1485 assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1487 assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1488 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1490 assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1494 private void setUpReadData(final String shardName, final NormalizedNode expectedNode) {
1495 ActorSystem actorSystem = getSystem();
1496 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1498 doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
1499 .actorSelection(shardActorRef.path().toString());
1501 doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).when(mockActorContext)
1502 .findPrimaryShardAsync(eq(shardName));
1504 ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1506 doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext)
1507 .actorSelection(txActorRef.path().toString());
1509 doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION)))
1510 .when(mockActorContext).executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1511 eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1513 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1514 eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.empty()), any(Timeout.class));