Merge "Stop swallowing exceptions in XmlElement optional methods."
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxyTest.java
1 /*
2  *
3  *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  *  This program and the accompanying materials are made available under the
6  *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  *  and is available at http://www.eclipse.org/legal/epl-v10.html
8  *
9  */
10
11 package org.opendaylight.controller.cluster.datastore;
12
13 import static org.junit.Assert.assertEquals;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.anyObject;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.timeout;
22 import static org.mockito.Mockito.times;
23 import static org.mockito.Mockito.verify;
24 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
25 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
26 import akka.actor.ActorRef;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicReference;
30 import org.junit.Assert;
31 import org.junit.Test;
32 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
33 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
34 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
35 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
36 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
43 import scala.concurrent.Promise;
44
45 public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
46
47     @SuppressWarnings("resource")
48     @Test
49     public void testNewReadOnlyTransaction() throws Exception {
50
51      DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadOnlyTransaction();
52          Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
53
54     }
55
56     @SuppressWarnings("resource")
57     @Test
58     public void testNewReadWriteTransaction() throws Exception {
59         DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadWriteTransaction();
60         Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
61
62     }
63
64     @SuppressWarnings("resource")
65     @Test
66     public void testNewWriteOnlyTransaction() throws Exception {
67         DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newWriteOnlyTransaction();
68         Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
69
70     }
71
72     @Test
73     public void testClose() throws Exception {
74         new TransactionChainProxy(mockActorContext).close();
75
76         verify(mockActorContext, times(1)).broadcast(anyObject());
77     }
78
79     @Test
80     public void testTransactionChainsHaveUniqueId(){
81         TransactionChainProxy one = new TransactionChainProxy(mock(ActorContext.class));
82         TransactionChainProxy two = new TransactionChainProxy(mock(ActorContext.class));
83
84         Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
85     }
86
87     @Test
88     public void testRateLimitingUsedInReadWriteTxCreation(){
89         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
90
91         txChainProxy.newReadWriteTransaction();
92
93         verify(mockActorContext, times(1)).acquireTxCreationPermit();
94     }
95
96     @Test
97     public void testRateLimitingUsedInWriteOnlyTxCreation(){
98         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
99
100         txChainProxy.newWriteOnlyTransaction();
101
102         verify(mockActorContext, times(1)).acquireTxCreationPermit();
103     }
104
105
106     @Test
107     public void testRateLimitingNotUsedInReadOnlyTxCreation(){
108         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
109
110         txChainProxy.newReadOnlyTransaction();
111
112         verify(mockActorContext, times(0)).acquireTxCreationPermit();
113     }
114
115     /**
116      * Tests 2 successive chained write-only transactions and verifies the second transaction isn't
117      * initiated until the first one completes its read future.
118      */
119     @Test
120     public void testChainedWriteOnlyTransactions() throws Exception {
121         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
122
123         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
124
125         ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
126
127         Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
128         doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
129                 eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
130
131         DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
132
133         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
134         writeTx1.write(TestModel.TEST_PATH, writeNode1);
135
136         writeTx1.ready();
137
138         verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
139
140         verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
141
142         ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
143
144         expectBatchedModifications(txActorRef2, 1);
145
146         final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
147
148         final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction();
149
150         final AtomicReference<Exception> caughtEx = new AtomicReference<>();
151         final CountDownLatch write2Complete = new CountDownLatch(1);
152         new Thread() {
153             @Override
154             public void run() {
155                 try {
156                     writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
157                 } catch (Exception e) {
158                     caughtEx.set(e);
159                 } finally {
160                     write2Complete.countDown();
161                 }
162             }
163         }.start();
164
165         assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
166
167         if(caughtEx.get() != null) {
168             throw caughtEx.get();
169         }
170
171         try {
172             verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
173         } catch (AssertionError e) {
174             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
175         }
176
177         batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
178
179         // Tx 2 should've proceeded to find the primary shard.
180         verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
181     }
182
183     /**
184      * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
185      * initiated until the first one completes its read future.
186      */
187     @Test
188     public void testChainedReadWriteTransactions() throws Exception {
189         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
190
191         ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
192
193         expectBatchedModifications(txActorRef1, 1);
194
195         Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
196         doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
197                 eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
198
199         DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
200
201         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
202         writeTx1.write(TestModel.TEST_PATH, writeNode1);
203
204         writeTx1.ready();
205
206         verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
207
208         String tx2MemberName = "tx2MemberName";
209         doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
210         ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
211         ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
212                 DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2);
213
214         expectBatchedModifications(txActorRef2, 1);
215
216         final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
217
218         final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction();
219
220         final AtomicReference<Exception> caughtEx = new AtomicReference<>();
221         final CountDownLatch write2Complete = new CountDownLatch(1);
222         new Thread() {
223             @Override
224             public void run() {
225                 try {
226                     writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
227                 } catch (Exception e) {
228                     caughtEx.set(e);
229                 } finally {
230                     write2Complete.countDown();
231                 }
232             }
233         }.start();
234
235         assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
236
237         if(caughtEx.get() != null) {
238             throw caughtEx.get();
239         }
240
241         try {
242             verify(mockActorContext, never()).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
243                     eqCreateTransaction(tx2MemberName, READ_WRITE));
244         } catch (AssertionError e) {
245             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
246         }
247
248         readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
249
250         verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
251                 eqCreateTransaction(tx2MemberName, READ_WRITE));
252     }
253
254     @Test(expected=IllegalStateException.class)
255     public void testChainedWriteTransactionsWithPreviousTxNotReady() throws Exception {
256         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
257
258         expectBatchedModifications(actorRef, 1);
259
260         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
261
262         DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
263
264         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
265         writeTx1.write(TestModel.TEST_PATH, writeNode1);
266
267         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
268
269         txChainProxy.newWriteOnlyTransaction();
270     }
271 }