2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.mockito.ArgumentMatchers.any;
14 import static org.mockito.ArgumentMatchers.eq;
15 import static org.mockito.ArgumentMatchers.isA;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.timeout;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
24 import akka.actor.ActorRef;
25 import akka.util.Timeout;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicReference;
29 import java.util.function.Function;
30 import org.junit.Assert;
31 import org.junit.Test;
32 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
33 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
34 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
35 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
36 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
37 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
38 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
39 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransaction;
40 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
43 import scala.concurrent.Promise;
45 public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
46 private LocalHistoryIdentifier historyId;
51 historyId = MockIdentifiers.historyIdentifier(TransactionChainProxyTest.class, memberName);
54 @SuppressWarnings("resource")
56 public void testNewReadOnlyTransaction() {
58 DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory, historyId).newReadOnlyTransaction();
59 Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
63 @SuppressWarnings("resource")
65 public void testNewReadWriteTransaction() {
66 DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory, historyId).newReadWriteTransaction();
67 Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
71 @SuppressWarnings("resource")
73 public void testNewWriteOnlyTransaction() {
74 DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory, historyId).newWriteOnlyTransaction();
75 Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
79 @SuppressWarnings("unchecked")
81 public void testClose() {
82 new TransactionChainProxy(mockComponentFactory, historyId).close();
84 verify(mockActorContext, times(1)).broadcast(any(Function.class), any(Class.class));
88 public void testRateLimitingUsedInReadWriteTxCreation() {
89 try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
91 txChainProxy.newReadWriteTransaction();
93 verify(mockActorContext, times(1)).acquireTxCreationPermit();
98 public void testRateLimitingUsedInWriteOnlyTxCreation() {
99 try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
101 txChainProxy.newWriteOnlyTransaction();
103 verify(mockActorContext, times(1)).acquireTxCreationPermit();
108 public void testRateLimitingNotUsedInReadOnlyTxCreation() {
109 try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
111 txChainProxy.newReadOnlyTransaction();
113 verify(mockActorContext, times(0)).acquireTxCreationPermit();
118 * Tests 2 successive chained write-only transactions and verifies the second transaction isn't
119 * initiated until the first one completes its read future.
122 @SuppressWarnings("checkstyle:IllegalCatch")
123 public void testChainedWriteOnlyTransactions() throws Exception {
124 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
126 try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
128 ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
130 Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
131 doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
132 eq(actorSelection(txActorRef1)), isA(BatchedModifications.class), any(Timeout.class));
134 DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
136 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
137 writeTx1.write(TestModel.TEST_PATH, writeNode1);
141 verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
143 verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
145 ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
147 expectBatchedModifications(txActorRef2, 1);
149 final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
151 final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction();
153 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
154 final CountDownLatch write2Complete = new CountDownLatch(1);
157 writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
158 } catch (Exception e) {
161 write2Complete.countDown();
165 assertTrue("Tx 2 write should've completed", write2Complete.await(5, TimeUnit.SECONDS));
167 if (caughtEx.get() != null) {
168 throw caughtEx.get();
172 verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
173 } catch (AssertionError e) {
174 fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
177 batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
179 // Tx 2 should've proceeded to find the primary shard.
180 verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(
181 eq(DefaultShardStrategy.DEFAULT_SHARD));
186 * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
187 * initiated until the first one completes its read future.
190 @SuppressWarnings("checkstyle:IllegalCatch")
191 public void testChainedReadWriteTransactions() throws Exception {
192 try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
194 ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
196 expectBatchedModifications(txActorRef1, 1);
198 Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
199 doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
200 eq(actorSelection(txActorRef1)), isA(BatchedModifications.class), any(Timeout.class));
202 DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
204 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
205 writeTx1.write(TestModel.TEST_PATH, writeNode1);
209 verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
211 String tx2MemberName = "mock-member";
212 ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
213 ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
214 DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2);
216 expectBatchedModifications(txActorRef2, 1);
218 final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
220 final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction();
222 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
223 final CountDownLatch write2Complete = new CountDownLatch(1);
226 writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
227 } catch (Exception e) {
230 write2Complete.countDown();
234 assertTrue("Tx 2 write should've completed", write2Complete.await(5, TimeUnit.SECONDS));
236 if (caughtEx.get() != null) {
237 throw caughtEx.get();
241 verify(mockActorContext, never()).executeOperationAsync(
242 eq(getSystem().actorSelection(shardActorRef2.path())),
243 eqCreateTransaction(tx2MemberName, READ_WRITE));
244 } catch (AssertionError e) {
245 fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
248 readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
250 verify(mockActorContext, timeout(5000)).executeOperationAsync(
251 eq(getSystem().actorSelection(shardActorRef2.path())),
252 eqCreateTransaction(tx2MemberName, READ_WRITE), any(Timeout.class));
256 @Test(expected = IllegalStateException.class)
257 public void testChainedWriteTransactionsWithPreviousTxNotReady() {
258 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
260 expectBatchedModifications(actorRef, 1);
262 try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
264 DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
266 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
267 writeTx1.write(TestModel.TEST_PATH, writeNode1);
269 txChainProxy.newWriteOnlyTransaction();