3 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 package org.opendaylight.controller.cluster.datastore;
13 import static org.junit.Assert.assertEquals;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.anyObject;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.timeout;
21 import static org.mockito.Mockito.times;
22 import static org.mockito.Mockito.verify;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
25 import akka.actor.ActorRef;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicReference;
29 import org.junit.Assert;
30 import org.junit.Test;
31 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
32 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
33 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
34 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
39 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
40 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
41 import scala.concurrent.Promise;
43 public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
45 @SuppressWarnings("resource")
47 public void testNewReadOnlyTransaction() throws Exception {
49 DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadOnlyTransaction();
50 Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
54 @SuppressWarnings("resource")
56 public void testNewReadWriteTransaction() throws Exception {
57 DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadWriteTransaction();
58 Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
62 @SuppressWarnings("resource")
64 public void testNewWriteOnlyTransaction() throws Exception {
65 DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newWriteOnlyTransaction();
66 Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
71 public void testClose() throws Exception {
72 new TransactionChainProxy(mockComponentFactory).close();
74 verify(mockActorContext, times(1)).broadcast(anyObject());
78 public void testTransactionChainsHaveUniqueId(){
79 TransactionChainProxy one = new TransactionChainProxy(mockComponentFactory);
80 TransactionChainProxy two = new TransactionChainProxy(mockComponentFactory);
82 Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
86 public void testRateLimitingUsedInReadWriteTxCreation(){
87 TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
89 txChainProxy.newReadWriteTransaction();
91 verify(mockActorContext, times(1)).acquireTxCreationPermit();
95 public void testRateLimitingUsedInWriteOnlyTxCreation(){
96 TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
98 txChainProxy.newWriteOnlyTransaction();
100 verify(mockActorContext, times(1)).acquireTxCreationPermit();
105 public void testRateLimitingNotUsedInReadOnlyTxCreation(){
106 TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
108 txChainProxy.newReadOnlyTransaction();
110 verify(mockActorContext, times(0)).acquireTxCreationPermit();
114 * Tests 2 successive chained write-only transactions and verifies the second transaction isn't
115 * initiated until the first one completes its read future.
118 public void testChainedWriteOnlyTransactions() throws Exception {
119 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
121 TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
123 ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
125 Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
126 doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
127 eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
129 DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
131 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
132 writeTx1.write(TestModel.TEST_PATH, writeNode1);
136 verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
138 verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
140 ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
142 expectBatchedModifications(txActorRef2, 1);
144 final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
146 final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction();
148 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
149 final CountDownLatch write2Complete = new CountDownLatch(1);
154 writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
155 } catch (Exception e) {
158 write2Complete.countDown();
163 assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
165 if(caughtEx.get() != null) {
166 throw caughtEx.get();
170 verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
171 } catch (AssertionError e) {
172 fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
175 batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
177 // Tx 2 should've proceeded to find the primary shard.
178 verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
182 * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
183 * initiated until the first one completes its read future.
186 public void testChainedReadWriteTransactions() throws Exception {
187 TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
189 ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
191 expectBatchedModifications(txActorRef1, 1);
193 Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
194 doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
195 eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
197 DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
199 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
200 writeTx1.write(TestModel.TEST_PATH, writeNode1);
204 verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
206 String tx2MemberName = "tx2MemberName";
207 doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
208 ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
209 ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
210 DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2);
212 expectBatchedModifications(txActorRef2, 1);
214 final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
216 final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction();
218 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
219 final CountDownLatch write2Complete = new CountDownLatch(1);
224 writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
225 } catch (Exception e) {
228 write2Complete.countDown();
233 assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
235 if(caughtEx.get() != null) {
236 throw caughtEx.get();
240 verify(mockActorContext, never()).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
241 eqCreateTransaction(tx2MemberName, READ_WRITE));
242 } catch (AssertionError e) {
243 fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
246 readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
248 verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
249 eqCreateTransaction(tx2MemberName, READ_WRITE));
252 @Test(expected=IllegalStateException.class)
253 public void testChainedWriteTransactionsWithPreviousTxNotReady() throws Exception {
254 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
256 expectBatchedModifications(actorRef, 1);
258 TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
260 DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
262 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
263 writeTx1.write(TestModel.TEST_PATH, writeNode1);
265 NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
267 txChainProxy.newWriteOnlyTransaction();