CDS: Implement front-end support for local transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxyTest.java
1 /*
2  *
3  *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  *  This program and the accompanying materials are made available under the
6  *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  *  and is available at http://www.eclipse.org/legal/epl-v10.html
8  *
9  */
10
11 package org.opendaylight.controller.cluster.datastore;
12
13 import static org.junit.Assert.assertEquals;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.anyObject;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.timeout;
21 import static org.mockito.Mockito.times;
22 import static org.mockito.Mockito.verify;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
25 import akka.actor.ActorRef;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicReference;
29 import org.junit.Assert;
30 import org.junit.Test;
31 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
32 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
33 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
34 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
39 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
40 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
41 import scala.concurrent.Promise;
42
43 public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
44
45     @SuppressWarnings("resource")
46     @Test
47     public void testNewReadOnlyTransaction() throws Exception {
48
49      DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadOnlyTransaction();
50          Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
51
52     }
53
54     @SuppressWarnings("resource")
55     @Test
56     public void testNewReadWriteTransaction() throws Exception {
57         DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadWriteTransaction();
58         Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
59
60     }
61
62     @SuppressWarnings("resource")
63     @Test
64     public void testNewWriteOnlyTransaction() throws Exception {
65         DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newWriteOnlyTransaction();
66         Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
67
68     }
69
70     @Test
71     public void testClose() throws Exception {
72         new TransactionChainProxy(mockComponentFactory).close();
73
74         verify(mockActorContext, times(1)).broadcast(anyObject());
75     }
76
77     @Test
78     public void testTransactionChainsHaveUniqueId(){
79         TransactionChainProxy one = new TransactionChainProxy(mockComponentFactory);
80         TransactionChainProxy two = new TransactionChainProxy(mockComponentFactory);
81
82         Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
83     }
84
85     @Test
86     public void testRateLimitingUsedInReadWriteTxCreation(){
87         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
88
89         txChainProxy.newReadWriteTransaction();
90
91         verify(mockActorContext, times(1)).acquireTxCreationPermit();
92     }
93
94     @Test
95     public void testRateLimitingUsedInWriteOnlyTxCreation(){
96         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
97
98         txChainProxy.newWriteOnlyTransaction();
99
100         verify(mockActorContext, times(1)).acquireTxCreationPermit();
101     }
102
103
104     @Test
105     public void testRateLimitingNotUsedInReadOnlyTxCreation(){
106         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
107
108         txChainProxy.newReadOnlyTransaction();
109
110         verify(mockActorContext, times(0)).acquireTxCreationPermit();
111     }
112
113     /**
114      * Tests 2 successive chained write-only transactions and verifies the second transaction isn't
115      * initiated until the first one completes its read future.
116      */
117     @Test
118     public void testChainedWriteOnlyTransactions() throws Exception {
119         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
120
121         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
122
123         ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
124
125         Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
126         doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
127                 eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
128
129         DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
130
131         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
132         writeTx1.write(TestModel.TEST_PATH, writeNode1);
133
134         writeTx1.ready();
135
136         verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
137
138         verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
139
140         ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
141
142         expectBatchedModifications(txActorRef2, 1);
143
144         final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
145
146         final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction();
147
148         final AtomicReference<Exception> caughtEx = new AtomicReference<>();
149         final CountDownLatch write2Complete = new CountDownLatch(1);
150         new Thread() {
151             @Override
152             public void run() {
153                 try {
154                     writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
155                 } catch (Exception e) {
156                     caughtEx.set(e);
157                 } finally {
158                     write2Complete.countDown();
159                 }
160             }
161         }.start();
162
163         assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
164
165         if(caughtEx.get() != null) {
166             throw caughtEx.get();
167         }
168
169         try {
170             verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
171         } catch (AssertionError e) {
172             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
173         }
174
175         batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
176
177         // Tx 2 should've proceeded to find the primary shard.
178         verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
179     }
180
181     /**
182      * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
183      * initiated until the first one completes its read future.
184      */
185     @Test
186     public void testChainedReadWriteTransactions() throws Exception {
187         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
188
189         ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
190
191         expectBatchedModifications(txActorRef1, 1);
192
193         Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
194         doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
195                 eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
196
197         DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
198
199         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
200         writeTx1.write(TestModel.TEST_PATH, writeNode1);
201
202         writeTx1.ready();
203
204         verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
205
206         String tx2MemberName = "tx2MemberName";
207         doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
208         ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
209         ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
210                 DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2);
211
212         expectBatchedModifications(txActorRef2, 1);
213
214         final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
215
216         final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction();
217
218         final AtomicReference<Exception> caughtEx = new AtomicReference<>();
219         final CountDownLatch write2Complete = new CountDownLatch(1);
220         new Thread() {
221             @Override
222             public void run() {
223                 try {
224                     writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
225                 } catch (Exception e) {
226                     caughtEx.set(e);
227                 } finally {
228                     write2Complete.countDown();
229                 }
230             }
231         }.start();
232
233         assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
234
235         if(caughtEx.get() != null) {
236             throw caughtEx.get();
237         }
238
239         try {
240             verify(mockActorContext, never()).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
241                     eqCreateTransaction(tx2MemberName, READ_WRITE));
242         } catch (AssertionError e) {
243             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
244         }
245
246         readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
247
248         verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
249                 eqCreateTransaction(tx2MemberName, READ_WRITE));
250     }
251
252     @Test(expected=IllegalStateException.class)
253     public void testChainedWriteTransactionsWithPreviousTxNotReady() throws Exception {
254         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
255
256         expectBatchedModifications(actorRef, 1);
257
258         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
259
260         DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
261
262         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
263         writeTx1.write(TestModel.TEST_PATH, writeNode1);
264
265         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
266
267         txChainProxy.newWriteOnlyTransaction();
268     }
269 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.