Optimize TransactionProxy for write-only transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxyTest.java
1 /*
2  *
3  *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  *  This program and the accompanying materials are made available under the
6  *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  *  and is available at http://www.eclipse.org/legal/epl-v10.html
8  *
9  */
10
11 package org.opendaylight.controller.cluster.datastore;
12
13 import static org.junit.Assert.assertEquals;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.anyObject;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.timeout;
22 import static org.mockito.Mockito.times;
23 import static org.mockito.Mockito.verify;
24 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
25 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
26 import akka.actor.ActorRef;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicReference;
30 import org.junit.Assert;
31 import org.junit.Test;
32 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
33 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
35 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
36 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
38 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
41 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
44 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
45 import scala.concurrent.Promise;
46
47 public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
48
49     @SuppressWarnings("resource")
50     @Test
51     public void testNewReadOnlyTransaction() throws Exception {
52
53      DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadOnlyTransaction();
54          Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
55
56     }
57
58     @SuppressWarnings("resource")
59     @Test
60     public void testNewReadWriteTransaction() throws Exception {
61         DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadWriteTransaction();
62         Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
63
64     }
65
66     @SuppressWarnings("resource")
67     @Test
68     public void testNewWriteOnlyTransaction() throws Exception {
69         DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newWriteOnlyTransaction();
70         Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
71
72     }
73
74     @Test
75     public void testClose() throws Exception {
76         new TransactionChainProxy(mockActorContext).close();
77
78         verify(mockActorContext, times(1)).broadcast(anyObject());
79     }
80
81     @Test
82     public void testTransactionChainsHaveUniqueId(){
83         TransactionChainProxy one = new TransactionChainProxy(mock(ActorContext.class));
84         TransactionChainProxy two = new TransactionChainProxy(mock(ActorContext.class));
85
86         Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
87     }
88
89     @Test
90     public void testRateLimitingUsedInReadWriteTxCreation(){
91         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
92
93         txChainProxy.newReadWriteTransaction();
94
95         verify(mockActorContext, times(1)).acquireTxCreationPermit();
96     }
97
98     @Test
99     public void testRateLimitingUsedInWriteOnlyTxCreation(){
100         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
101
102         txChainProxy.newWriteOnlyTransaction();
103
104         verify(mockActorContext, times(1)).acquireTxCreationPermit();
105     }
106
107
108     @Test
109     public void testRateLimitingNotUsedInReadOnlyTxCreation(){
110         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
111
112         txChainProxy.newReadOnlyTransaction();
113
114         verify(mockActorContext, times(0)).acquireTxCreationPermit();
115     }
116
117     /**
118      * Tests 2 successive chained write-only transactions and verifies the second transaction isn't
119      * initiated until the first one completes its read future.
120      */
121     @Test
122     public void testChainedWriteOnlyTransactions() throws Exception {
123         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
124
125         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
126
127         ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
128
129         Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
130         doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
131                 eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
132
133         DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
134
135         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
136         writeTx1.write(TestModel.TEST_PATH, writeNode1);
137
138         writeTx1.ready();
139
140         verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
141
142         verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
143
144         ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
145
146         expectBatchedModifications(txActorRef2, 1);
147
148         final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
149
150         final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction();
151
152         final AtomicReference<Exception> caughtEx = new AtomicReference<>();
153         final CountDownLatch write2Complete = new CountDownLatch(1);
154         new Thread() {
155             @Override
156             public void run() {
157                 try {
158                     writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
159                 } catch (Exception e) {
160                     caughtEx.set(e);
161                 } finally {
162                     write2Complete.countDown();
163                 }
164             }
165         }.start();
166
167         assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
168
169         if(caughtEx.get() != null) {
170             throw caughtEx.get();
171         }
172
173         try {
174             verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
175         } catch (AssertionError e) {
176             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
177         }
178
179         batchedReplyPromise1.success(new BatchedModificationsReply(1, txActorRef1.path().toString()));
180
181         // Tx 2 should've proceeded to find the primary shard.
182         verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
183     }
184
185     /**
186      * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
187      * initiated until the first one completes its read future.
188      */
189     @Test
190     public void testChainedReadWriteTransactions() throws Exception {
191         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
192
193         ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
194
195         expectBatchedModifications(txActorRef1, 1);
196
197         Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
198         doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
199                 eq(actorSelection(txActorRef1)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
200
201         DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
202
203         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
204         writeTx1.write(TestModel.TEST_PATH, writeNode1);
205
206         writeTx1.ready();
207
208         verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), false);
209
210         String tx2MemberName = "tx2MemberName";
211         doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
212         ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
213         ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
214                 DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2);
215
216         expectBatchedModifications(txActorRef2, 1);
217
218         final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
219
220         final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction();
221
222         final AtomicReference<Exception> caughtEx = new AtomicReference<>();
223         final CountDownLatch write2Complete = new CountDownLatch(1);
224         new Thread() {
225             @Override
226             public void run() {
227                 try {
228                     writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
229                 } catch (Exception e) {
230                     caughtEx.set(e);
231                 } finally {
232                     write2Complete.countDown();
233                 }
234             }
235         }.start();
236
237         assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
238
239         if(caughtEx.get() != null) {
240             throw caughtEx.get();
241         }
242
243         try {
244             verify(mockActorContext, never()).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
245                     eqCreateTransaction(tx2MemberName, READ_WRITE));
246         } catch (AssertionError e) {
247             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
248         }
249
250         readyReplyPromise1.success(readySerializedTxReply(txActorRef1.path().toString()).value().get().get());
251
252         verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
253                 eqCreateTransaction(tx2MemberName, READ_WRITE));
254     }
255
256     @Test(expected=IllegalStateException.class)
257     public void testChainedWriteTransactionsWithPreviousTxNotReady() throws Exception {
258         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
259
260         expectBatchedModifications(actorRef, 1);
261
262         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
263
264         DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
265
266         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
267         writeTx1.write(TestModel.TEST_PATH, writeNode1);
268
269         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
270
271         txChainProxy.newWriteOnlyTransaction();
272     }
273 }