2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.anyString;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.doThrow;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
26 import akka.actor.ActorRef;
27 import akka.actor.ActorSelection;
28 import akka.actor.ActorSystem;
29 import akka.actor.Props;
30 import akka.dispatch.Futures;
31 import akka.util.Timeout;
32 import com.google.common.base.Optional;
33 import com.google.common.base.Throwables;
34 import com.google.common.collect.ImmutableSortedSet;
35 import com.google.common.collect.Sets;
36 import com.google.common.util.concurrent.CheckedFuture;
37 import com.google.common.util.concurrent.FutureCallback;
38 import com.google.common.util.concurrent.MoreExecutors;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import java.util.Collection;
41 import java.util.List;
42 import java.util.SortedSet;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.ExecutionException;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicReference;
47 import org.junit.Assert;
48 import org.junit.Test;
49 import org.mockito.ArgumentCaptor;
50 import org.mockito.InOrder;
51 import org.mockito.Mockito;
52 import org.opendaylight.controller.cluster.access.concepts.MemberName;
53 import org.opendaylight.controller.cluster.datastore.config.Configuration;
54 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
55 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
56 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
57 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
59 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
63 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
64 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
65 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
66 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
67 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
68 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
69 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
70 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
71 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
72 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
73 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
74 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
75 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
76 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
77 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
78 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
79 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
80 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
81 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
82 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
83 import scala.concurrent.Promise;
85 @SuppressWarnings("resource")
86 public class TransactionProxyTest extends AbstractTransactionProxyTest {
88 @SuppressWarnings("serial")
89 static class TestException extends RuntimeException {
93 CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
97 public void testRead() throws Exception {
98 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
100 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
102 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
103 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
105 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
106 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
108 assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
110 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
112 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
113 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
115 readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
117 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
119 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
122 @Test(expected = ReadFailedException.class)
123 public void testReadWithInvalidReplyMessageType() throws Exception {
124 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
126 doReturn(Futures.successful(new Object())).when(mockActorContext)
127 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
129 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
131 transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
134 @Test(expected = TestException.class)
135 public void testReadWithAsyncRemoteOperatonFailure() throws Exception {
136 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
138 doReturn(Futures.failed(new TestException())).when(mockActorContext)
139 .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
141 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
143 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
146 private void testExceptionOnInitialCreateTransaction(final Exception exToThrow, final Invoker invoker)
148 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
150 if (exToThrow instanceof PrimaryNotFoundException) {
151 doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
153 doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
154 .findPrimaryShardAsync(anyString());
157 doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
158 any(ActorSelection.class), any(), any(Timeout.class));
160 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
162 propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
165 private void testReadWithExceptionOnInitialCreateTransaction(final Exception exToThrow) throws Exception {
166 testExceptionOnInitialCreateTransaction(exToThrow, proxy -> proxy.read(TestModel.TEST_PATH));
169 @Test(expected = PrimaryNotFoundException.class)
170 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
171 testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
174 @Test(expected = TimeoutException.class)
175 public void testReadWhenATimeoutExceptionIsThrown() throws Exception {
176 testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
177 new Exception("reason")));
180 @Test(expected = TestException.class)
181 public void testReadWhenAnyOtherExceptionIsThrown() throws Exception {
182 testReadWithExceptionOnInitialCreateTransaction(new TestException());
186 public void testReadWithPriorRecordingOperationSuccessful() throws Exception {
187 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
189 NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
191 expectBatchedModifications(actorRef, 1);
193 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
194 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
196 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
198 transactionProxy.write(TestModel.TEST_PATH, expectedNode);
200 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
201 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
203 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
204 assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
206 InOrder inOrder = Mockito.inOrder(mockActorContext);
207 inOrder.verify(mockActorContext).executeOperationAsync(
208 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
210 inOrder.verify(mockActorContext).executeOperationAsync(
211 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
214 @Test(expected = IllegalStateException.class)
215 public void testReadPreConditionCheck() {
216 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
217 transactionProxy.read(TestModel.TEST_PATH);
220 @Test(expected = IllegalArgumentException.class)
221 public void testInvalidCreateTransactionReply() throws Exception {
222 ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
224 doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext)
225 .actorSelection(actorRef.path().toString());
227 doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
228 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
230 doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
231 eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
234 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
236 propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
240 public void testExists() throws Exception {
241 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
243 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
245 doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
246 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
248 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
250 assertEquals("Exists response", false, exists);
252 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
253 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
255 exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
257 assertEquals("Exists response", true, exists);
260 @Test(expected = PrimaryNotFoundException.class)
261 public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
262 testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"),
263 proxy -> proxy.exists(TestModel.TEST_PATH));
266 @Test(expected = ReadFailedException.class)
267 public void testExistsWithInvalidReplyMessageType() throws Exception {
268 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
270 doReturn(Futures.successful(new Object())).when(mockActorContext)
271 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
273 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
275 transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
278 @Test(expected = TestException.class)
279 public void testExistsWithAsyncRemoteOperatonFailure() throws Exception {
280 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
282 doReturn(Futures.failed(new TestException())).when(mockActorContext)
283 .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
285 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
287 propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
291 public void testExistsWithPriorRecordingOperationSuccessful() throws Exception {
292 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
294 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
296 expectBatchedModifications(actorRef, 1);
298 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
299 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
301 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
303 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
305 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
307 assertEquals("Exists response", true, exists);
309 InOrder inOrder = Mockito.inOrder(mockActorContext);
310 inOrder.verify(mockActorContext).executeOperationAsync(
311 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
313 inOrder.verify(mockActorContext).executeOperationAsync(
314 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
317 @Test(expected = IllegalStateException.class)
318 public void testExistsPreConditionCheck() {
319 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
320 transactionProxy.exists(TestModel.TEST_PATH);
324 public void testWrite() throws Exception {
325 dataStoreContextBuilder.shardBatchedModificationCount(1);
326 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
328 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
330 expectBatchedModifications(actorRef, 1);
332 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
334 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
336 verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
340 @SuppressWarnings("checkstyle:IllegalCatch")
341 public void testWriteAfterAsyncRead() throws Exception {
342 ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(),
343 DefaultShardStrategy.DEFAULT_SHARD);
345 Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
346 doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
347 eq(getSystem().actorSelection(actorRef.path())),
348 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
350 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
351 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
353 expectBatchedModificationsReady(actorRef);
355 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
357 final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
359 final CountDownLatch readComplete = new CountDownLatch(1);
360 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
361 com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
362 new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
364 public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
366 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
367 } catch (Exception e) {
370 readComplete.countDown();
375 public void onFailure(final Throwable failure) {
376 caughtEx.set(failure);
377 readComplete.countDown();
379 }, MoreExecutors.directExecutor());
381 createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
383 Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
385 final Throwable t = caughtEx.get();
387 Throwables.propagateIfPossible(t, Exception.class);
388 throw new RuntimeException(t);
391 // This sends the batched modification.
392 transactionProxy.ready();
394 verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
397 @Test(expected = IllegalStateException.class)
398 public void testWritePreConditionCheck() {
399 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
400 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
403 @Test(expected = IllegalStateException.class)
404 public void testWriteAfterReadyPreConditionCheck() {
405 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
407 transactionProxy.ready();
409 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
413 public void testMerge() throws Exception {
414 dataStoreContextBuilder.shardBatchedModificationCount(1);
415 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
417 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
419 expectBatchedModifications(actorRef, 1);
421 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
423 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
425 verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
429 public void testDelete() throws Exception {
430 dataStoreContextBuilder.shardBatchedModificationCount(1);
431 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
433 expectBatchedModifications(actorRef, 1);
435 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
437 transactionProxy.delete(TestModel.TEST_PATH);
439 verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
443 public void testReadWrite() throws Exception {
444 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
446 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
448 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
449 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
451 expectBatchedModifications(actorRef, 1);
453 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
455 transactionProxy.read(TestModel.TEST_PATH);
457 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
459 transactionProxy.read(TestModel.TEST_PATH);
461 transactionProxy.read(TestModel.TEST_PATH);
463 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
464 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
466 verifyBatchedModifications(batchedModifications.get(0), false,
467 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
471 public void testReadyWithReadWrite() throws Exception {
472 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
474 final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
476 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
477 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
479 expectBatchedModificationsReady(actorRef, true);
481 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
483 transactionProxy.read(TestModel.TEST_PATH);
485 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
487 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
489 assertTrue(ready instanceof SingleCommitCohortProxy);
491 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
493 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
494 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
496 verifyBatchedModifications(batchedModifications.get(0), true, true,
497 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
499 assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
503 public void testReadyWithNoModifications() throws Exception {
504 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
506 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
507 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
509 expectBatchedModificationsReady(actorRef, true);
511 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
513 transactionProxy.read(TestModel.TEST_PATH);
515 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
517 assertTrue(ready instanceof SingleCommitCohortProxy);
519 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
521 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
522 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
524 verifyBatchedModifications(batchedModifications.get(0), true, true);
528 public void testReadyWithMultipleShardWrites() throws Exception {
529 ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
531 ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
532 TestModel.JUNK_QNAME.getLocalName());
534 expectBatchedModificationsReady(actorRef1);
535 expectBatchedModificationsReady(actorRef2);
537 ActorRef actorRef3 = getSystem().actorOf(Props.create(DoNothingActor.class));
539 doReturn(getSystem().actorSelection(actorRef3.path())).when(mockActorContext)
540 .actorSelection(actorRef3.path().toString());
542 doReturn(Futures.successful(newPrimaryShardInfo(actorRef3, createDataTree()))).when(mockActorContext)
543 .findPrimaryShardAsync(eq(CarsModel.BASE_QNAME.getLocalName()));
545 expectReadyLocalTransaction(actorRef3, false);
547 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
549 transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
550 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
551 transactionProxy.write(CarsModel.BASE_PATH, ImmutableNodes.containerNode(CarsModel.BASE_QNAME));
553 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
555 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
557 verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
558 actorSelection(actorRef2), actorSelection(actorRef3));
560 SortedSet<String> expShardNames =
561 ImmutableSortedSet.of(DefaultShardStrategy.DEFAULT_SHARD,
562 TestModel.JUNK_QNAME.getLocalName(), CarsModel.BASE_QNAME.getLocalName());
564 ArgumentCaptor<BatchedModifications> batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
565 verify(mockActorContext).executeOperationAsync(
566 eq(actorSelection(actorRef1)), batchedMods.capture(), any(Timeout.class));
567 assertEquals("Participating shards present", true,
568 batchedMods.getValue().getParticipatingShardNames().isPresent());
569 assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
571 batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
572 verify(mockActorContext).executeOperationAsync(
573 eq(actorSelection(actorRef2)), batchedMods.capture(), any(Timeout.class));
574 assertEquals("Participating shards present", true,
575 batchedMods.getValue().getParticipatingShardNames().isPresent());
576 assertEquals("Participating shards", expShardNames, batchedMods.getValue().getParticipatingShardNames().get());
578 ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
579 verify(mockActorContext).executeOperationAsync(
580 eq(actorSelection(actorRef3)), readyLocalTx.capture(), any(Timeout.class));
581 assertEquals("Participating shards present", true,
582 readyLocalTx.getValue().getParticipatingShardNames().isPresent());
583 assertEquals("Participating shards", expShardNames, readyLocalTx.getValue().getParticipatingShardNames().get());
587 public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
588 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
590 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
592 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
594 expectBatchedModificationsReady(actorRef, true);
596 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
598 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
600 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
602 assertTrue(ready instanceof SingleCommitCohortProxy);
604 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
606 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
607 assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
609 verifyBatchedModifications(batchedModifications.get(0), true, true,
610 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
614 public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
615 dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
616 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
618 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
620 expectBatchedModificationsReady(actorRef, true);
622 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
624 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
626 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
628 assertTrue(ready instanceof SingleCommitCohortProxy);
630 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
632 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
633 assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
635 verifyBatchedModifications(batchedModifications.get(0), false,
636 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
638 verifyBatchedModifications(batchedModifications.get(1), true, true);
642 public void testReadyWithReplyFailure() throws Exception {
643 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
645 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
647 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
649 expectFailedBatchedModifications(actorRef);
651 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
653 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
655 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
657 assertTrue(ready instanceof SingleCommitCohortProxy);
659 verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
663 public void testReadyWithDebugContextEnabled() throws Exception {
664 dataStoreContextBuilder.transactionDebugContextEnabled(true);
666 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
668 expectBatchedModificationsReady(actorRef, true);
670 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
672 transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
674 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
676 assertTrue(ready instanceof DebugThreePhaseCommitCohort);
678 verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
682 public void testReadyWithLocalTransaction() throws Exception {
683 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
685 doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
686 .actorSelection(shardActorRef.path().toString());
688 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).when(mockActorContext)
689 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
691 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
693 expectReadyLocalTransaction(shardActorRef, true);
695 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
696 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
698 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
699 assertTrue(ready instanceof SingleCommitCohortProxy);
700 verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
702 ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
703 verify(mockActorContext).executeOperationAsync(
704 eq(actorSelection(shardActorRef)), readyLocalTx.capture(), any(Timeout.class));
705 assertEquals("Participating shards present", false,
706 readyLocalTx.getValue().getParticipatingShardNames().isPresent());
710 public void testReadyWithLocalTransactionWithFailure() throws Exception {
711 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
713 doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
714 .actorSelection(shardActorRef.path().toString());
716 DataTree mockDataTree = createDataTree();
717 DataTreeModification mockModification = mockDataTree.takeSnapshot().newModification();
718 doThrow(new RuntimeException("mock")).when(mockModification).ready();
720 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).when(mockActorContext)
721 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
723 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
725 expectReadyLocalTransaction(shardActorRef, true);
727 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
728 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
730 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
731 assertTrue(ready instanceof SingleCommitCohortProxy);
732 verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
735 private void testWriteOnlyTxWithFindPrimaryShardFailure(final Exception toThrow) throws Exception {
736 doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
738 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
740 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
742 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
744 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
746 transactionProxy.delete(TestModel.TEST_PATH);
748 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
750 assertTrue(ready instanceof SingleCommitCohortProxy);
752 verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
756 public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
757 testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
761 public void testWriteOnlyTxWithNotInitializedException() throws Exception {
762 testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
766 public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
767 testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
771 public void testReadyWithInvalidReplyMessageType() throws Exception {
772 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
773 ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
775 ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
776 TestModel.JUNK_QNAME.getLocalName());
778 doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
779 eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
781 expectBatchedModificationsReady(actorRef2);
783 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
785 transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
786 transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
788 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
790 assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
792 verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
793 IllegalArgumentException.class);
797 public void testGetIdentifier() {
798 setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
799 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
801 Object id = transactionProxy.getIdentifier();
802 assertNotNull("getIdentifier returned null", id);
803 assertTrue("Invalid identifier: " + id, id.toString().contains(memberName));
807 public void testClose() throws Exception {
808 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
810 doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
811 eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
813 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
815 transactionProxy.read(TestModel.TEST_PATH);
817 transactionProxy.close();
819 verify(mockActorContext).sendOperationAsync(
820 eq(actorSelection(actorRef)), isA(CloseTransaction.class));
823 private interface TransactionProxyOperation {
824 void run(TransactionProxy transactionProxy);
827 private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef) {
828 return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
831 private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef, final DataTree dataTree) {
832 return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
836 private void throttleOperation(final TransactionProxyOperation operation) {
837 throttleOperation(operation, 1, true);
840 private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
841 final boolean shardFound) {
842 throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
843 mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
846 private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
847 final boolean shardFound, final long expectedCompletionTime) {
848 ActorSystem actorSystem = getSystem();
849 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
851 // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
852 // we now allow one extra permit to be allowed for ready
853 doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2)
854 .shardBatchedModificationCount(outstandingOpsLimit - 1).build()).when(mockActorContext)
855 .getDatastoreContext();
857 doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
858 .actorSelection(shardActorRef.path().toString());
861 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
862 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
863 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
864 .findPrimaryShardAsync(eq("cars"));
867 doReturn(Futures.failed(new Exception("not found")))
868 .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
871 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
872 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
875 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
877 long start = System.nanoTime();
879 operation.run(transactionProxy);
881 long end = System.nanoTime();
883 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
884 expectedCompletionTime, end - start),
885 end - start > expectedCompletionTime && end - start < expectedCompletionTime * 2);
889 private void completeOperation(final TransactionProxyOperation operation) {
890 completeOperation(operation, true);
893 private void completeOperation(final TransactionProxyOperation operation, final boolean shardFound) {
894 ActorSystem actorSystem = getSystem();
895 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
897 doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
898 .actorSelection(shardActorRef.path().toString());
901 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
902 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
904 doReturn(Futures.failed(new PrimaryNotFoundException("test"))).when(mockActorContext)
905 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
908 ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
909 String actorPath = txActorRef.path().toString();
910 CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, nextTransactionId(),
911 DataStoreVersions.CURRENT_VERSION);
913 doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
915 doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).executeOperationAsync(
916 eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
919 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
921 long start = System.nanoTime();
923 operation.run(transactionProxy);
925 long end = System.nanoTime();
927 long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
928 .getOperationTimeoutInMillis());
929 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
930 expected, end - start), end - start <= expected);
933 private void completeOperationLocal(final TransactionProxyOperation operation, final DataTree dataTree) {
934 ActorSystem actorSystem = getSystem();
935 ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
937 doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
938 .actorSelection(shardActorRef.path().toString());
940 doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTree))).when(mockActorContext)
941 .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
943 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
945 long start = System.nanoTime();
947 operation.run(transactionProxy);
949 long end = System.nanoTime();
951 long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
952 .getOperationTimeoutInMillis());
953 Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", expected, end - start),
954 end - start <= expected);
957 private static DataTree createDataTree() {
958 DataTree dataTree = mock(DataTree.class);
959 DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
960 DataTreeModification dataTreeModification = mock(DataTreeModification.class);
962 doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
963 doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
968 private static DataTree createDataTree(final NormalizedNode<?, ?> readResponse) {
969 DataTree dataTree = mock(DataTree.class);
970 DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
971 DataTreeModification dataTreeModification = mock(DataTreeModification.class);
973 doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
974 doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
975 doReturn(java.util.Optional.of(readResponse)).when(dataTreeModification).readNode(
976 any(YangInstanceIdentifier.class));
983 public void testWriteCompletionForLocalShard() {
984 completeOperationLocal(transactionProxy -> {
985 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
987 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
989 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
991 }, createDataTree());
995 public void testWriteThrottlingWhenShardFound() {
996 throttleOperation(transactionProxy -> {
997 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
999 expectIncompleteBatchedModifications();
1001 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1003 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1008 public void testWriteThrottlingWhenShardNotFound() {
1009 // Confirm that there is no throttling when the Shard is not found
1010 completeOperation(transactionProxy -> {
1011 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1013 expectBatchedModifications(2);
1015 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1017 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1024 public void testWriteCompletion() {
1025 completeOperation(transactionProxy -> {
1026 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1028 expectBatchedModifications(2);
1030 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1032 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1037 public void testMergeThrottlingWhenShardFound() {
1038 throttleOperation(transactionProxy -> {
1039 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1041 expectIncompleteBatchedModifications();
1043 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1045 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1050 public void testMergeThrottlingWhenShardNotFound() {
1051 completeOperation(transactionProxy -> {
1052 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1054 expectBatchedModifications(2);
1056 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1058 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1063 public void testMergeCompletion() {
1064 completeOperation(transactionProxy -> {
1065 NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1067 expectBatchedModifications(2);
1069 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1071 transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
1077 public void testMergeCompletionForLocalShard() {
1078 completeOperationLocal(transactionProxy -> {
1079 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1081 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1083 transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
1085 }, createDataTree());
1090 public void testDeleteThrottlingWhenShardFound() {
1092 throttleOperation(transactionProxy -> {
1093 expectIncompleteBatchedModifications();
1095 transactionProxy.delete(TestModel.TEST_PATH);
1097 transactionProxy.delete(TestModel.TEST_PATH);
1103 public void testDeleteThrottlingWhenShardNotFound() {
1105 completeOperation(transactionProxy -> {
1106 expectBatchedModifications(2);
1108 transactionProxy.delete(TestModel.TEST_PATH);
1110 transactionProxy.delete(TestModel.TEST_PATH);
1115 public void testDeleteCompletionForLocalShard() {
1116 completeOperationLocal(transactionProxy -> {
1118 transactionProxy.delete(TestModel.TEST_PATH);
1120 transactionProxy.delete(TestModel.TEST_PATH);
1121 }, createDataTree());
1126 public void testDeleteCompletion() {
1127 completeOperation(transactionProxy -> {
1128 expectBatchedModifications(2);
1130 transactionProxy.delete(TestModel.TEST_PATH);
1132 transactionProxy.delete(TestModel.TEST_PATH);
1138 public void testReadThrottlingWhenShardFound() {
1140 throttleOperation(transactionProxy -> {
1141 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1142 any(ActorSelection.class), eqReadData());
1144 transactionProxy.read(TestModel.TEST_PATH);
1146 transactionProxy.read(TestModel.TEST_PATH);
1151 public void testReadThrottlingWhenShardNotFound() {
1153 completeOperation(transactionProxy -> {
1154 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1155 any(ActorSelection.class), eqReadData());
1157 transactionProxy.read(TestModel.TEST_PATH);
1159 transactionProxy.read(TestModel.TEST_PATH);
1165 public void testReadCompletion() {
1166 completeOperation(transactionProxy -> {
1167 NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1169 doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
1170 any(ActorSelection.class), eqReadData(), any(Timeout.class));
1172 transactionProxy.read(TestModel.TEST_PATH);
1174 transactionProxy.read(TestModel.TEST_PATH);
1180 public void testReadCompletionForLocalShard() {
1181 final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1182 completeOperationLocal(transactionProxy -> {
1183 transactionProxy.read(TestModel.TEST_PATH);
1185 transactionProxy.read(TestModel.TEST_PATH);
1186 }, createDataTree(nodeToRead));
1191 public void testReadCompletionForLocalShardWhenExceptionOccurs() {
1192 completeOperationLocal(transactionProxy -> {
1193 transactionProxy.read(TestModel.TEST_PATH);
1195 transactionProxy.read(TestModel.TEST_PATH);
1196 }, createDataTree());
1201 public void testExistsThrottlingWhenShardFound() {
1203 throttleOperation(transactionProxy -> {
1204 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1205 any(ActorSelection.class), eqDataExists());
1207 transactionProxy.exists(TestModel.TEST_PATH);
1209 transactionProxy.exists(TestModel.TEST_PATH);
1214 public void testExistsThrottlingWhenShardNotFound() {
1216 completeOperation(transactionProxy -> {
1217 doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
1218 any(ActorSelection.class), eqDataExists());
1220 transactionProxy.exists(TestModel.TEST_PATH);
1222 transactionProxy.exists(TestModel.TEST_PATH);
1228 public void testExistsCompletion() {
1229 completeOperation(transactionProxy -> {
1230 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1231 any(ActorSelection.class), eqDataExists(), any(Timeout.class));
1233 transactionProxy.exists(TestModel.TEST_PATH);
1235 transactionProxy.exists(TestModel.TEST_PATH);
1241 public void testExistsCompletionForLocalShard() {
1242 final NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1243 completeOperationLocal(transactionProxy -> {
1244 transactionProxy.exists(TestModel.TEST_PATH);
1246 transactionProxy.exists(TestModel.TEST_PATH);
1247 }, createDataTree(nodeToRead));
1252 public void testExistsCompletionForLocalShardWhenExceptionOccurs() {
1253 completeOperationLocal(transactionProxy -> {
1254 transactionProxy.exists(TestModel.TEST_PATH);
1256 transactionProxy.exists(TestModel.TEST_PATH);
1257 }, createDataTree());
1262 public void testReadyThrottling() {
1264 throttleOperation(transactionProxy -> {
1265 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1267 expectBatchedModifications(1);
1269 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1271 transactionProxy.ready();
1276 public void testReadyThrottlingWithTwoTransactionContexts() {
1277 throttleOperation(transactionProxy -> {
1278 NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1279 NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
1281 expectBatchedModifications(2);
1283 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
1285 // Trying to write to Cars will cause another transaction context to get created
1286 transactionProxy.write(CarsModel.BASE_PATH, carsNode);
1288 // Now ready should block for both transaction contexts
1289 transactionProxy.ready();
1290 }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
1291 .getOperationTimeoutInMillis()) * 2);
1294 private void testModificationOperationBatching(final TransactionType type) throws Exception {
1295 int shardBatchedModificationCount = 3;
1296 dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1298 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
1300 expectBatchedModifications(actorRef, shardBatchedModificationCount);
1302 YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1303 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1305 YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1306 NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1308 YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
1309 NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1311 YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1312 NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1314 YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
1315 NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1317 YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
1318 NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1320 YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
1321 YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
1323 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
1325 transactionProxy.write(writePath1, writeNode1);
1326 transactionProxy.write(writePath2, writeNode2);
1327 transactionProxy.delete(deletePath1);
1328 transactionProxy.merge(mergePath1, mergeNode1);
1329 transactionProxy.merge(mergePath2, mergeNode2);
1330 transactionProxy.write(writePath3, writeNode3);
1331 transactionProxy.merge(mergePath3, mergeNode3);
1332 transactionProxy.delete(deletePath2);
1334 // This sends the last batch.
1335 transactionProxy.ready();
1337 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1338 assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1340 verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1341 new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
1343 verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1344 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
1346 verifyBatchedModifications(batchedModifications.get(2), true, true,
1347 new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
1349 assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
1353 public void testReadWriteModificationOperationBatching() throws Exception {
1354 testModificationOperationBatching(READ_WRITE);
1358 public void testWriteOnlyModificationOperationBatching() throws Exception {
1359 testModificationOperationBatching(WRITE_ONLY);
1363 public void testOptimizedWriteOnlyModificationOperationBatching() throws Exception {
1364 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
1365 testModificationOperationBatching(WRITE_ONLY);
1369 public void testModificationOperationBatchingWithInterleavedReads() throws Exception {
1371 int shardBatchedModificationCount = 10;
1372 dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
1374 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
1376 expectBatchedModifications(actorRef, shardBatchedModificationCount);
1378 final YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
1379 final NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1381 YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
1382 NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
1384 final YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
1385 final NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1387 YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
1388 NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
1390 final YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
1392 doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
1393 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1395 doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
1396 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1398 doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
1399 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1401 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
1403 transactionProxy.write(writePath1, writeNode1);
1404 transactionProxy.write(writePath2, writeNode2);
1406 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).get(5, TimeUnit.SECONDS);
1408 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1409 assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
1411 transactionProxy.merge(mergePath1, mergeNode1);
1412 transactionProxy.merge(mergePath2, mergeNode2);
1414 readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
1416 transactionProxy.delete(deletePath);
1418 Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
1419 assertEquals("Exists response", true, exists);
1421 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1422 assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
1424 List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
1425 assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
1427 verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
1428 new WriteModification(writePath2, writeNode2));
1430 verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
1431 new MergeModification(mergePath2, mergeNode2));
1433 verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
1435 InOrder inOrder = Mockito.inOrder(mockActorContext);
1436 inOrder.verify(mockActorContext).executeOperationAsync(
1437 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1439 inOrder.verify(mockActorContext).executeOperationAsync(
1440 eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
1442 inOrder.verify(mockActorContext).executeOperationAsync(
1443 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1445 inOrder.verify(mockActorContext).executeOperationAsync(
1446 eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
1448 inOrder.verify(mockActorContext).executeOperationAsync(
1449 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
1451 inOrder.verify(mockActorContext).executeOperationAsync(
1452 eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
1456 public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException,
1457 java.util.concurrent.TimeoutException {
1458 SchemaContext schemaContext = SchemaContextHelper.full();
1459 Configuration configuration = mock(Configuration.class);
1460 doReturn(configuration).when(mockActorContext).getConfiguration();
1461 doReturn(schemaContext).when(mockActorContext).getSchemaContext();
1462 doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
1464 NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1465 NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
1467 setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
1468 setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
1470 doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
1472 doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
1474 TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
1476 Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
1477 YangInstanceIdentifier.EMPTY).get(5, TimeUnit.SECONDS);
1479 assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
1481 NormalizedNode<?, ?> normalizedNode = readOptional.get();
1483 assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
1485 @SuppressWarnings("unchecked")
1486 Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
1488 for (NormalizedNode<?,?> node : collection) {
1489 assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
1492 assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
1493 NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
1495 assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
1497 assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
1498 NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
1500 assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
1504 private void setUpReadData(final String shardName, final NormalizedNode<?, ?> expectedNode) {
1505 ActorSystem actorSystem = getSystem();
1506 ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
1508 doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
1509 .actorSelection(shardActorRef.path().toString());
1511 doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).when(mockActorContext)
1512 .findPrimaryShardAsync(eq(shardName));
1514 ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
1516 doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext)
1517 .actorSelection(txActorRef.path().toString());
1519 doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION)))
1520 .when(mockActorContext).executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
1521 eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
1523 doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
1524 eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.EMPTY), any(Timeout.class));