CDS: Retry remote front-end transactions on AskTimeoutException
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxyTest.java
1 /*
2  *
3  *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  *  This program and the accompanying materials are made available under the
6  *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  *  and is available at http://www.eclipse.org/legal/epl-v10.html
8  *
9  */
10
11 package org.opendaylight.controller.cluster.datastore;
12
13 import static org.junit.Assert.assertEquals;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.anyObject;
17 import static org.mockito.Matchers.eq;
18 import static org.mockito.Matchers.isA;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.timeout;
22 import static org.mockito.Mockito.times;
23 import static org.mockito.Mockito.verify;
24 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
25 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
26 import akka.actor.ActorRef;
27 import akka.util.Timeout;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicReference;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
34 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
35 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
36 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
43 import scala.concurrent.Promise;
44
45 public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
46
47     @SuppressWarnings("resource")
48     @Test
49     public void testNewReadOnlyTransaction() throws Exception {
50
51      DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadOnlyTransaction();
52          Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
53
54     }
55
56     @SuppressWarnings("resource")
57     @Test
58     public void testNewReadWriteTransaction() throws Exception {
59         DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadWriteTransaction();
60         Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
61
62     }
63
64     @SuppressWarnings("resource")
65     @Test
66     public void testNewWriteOnlyTransaction() throws Exception {
67         DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newWriteOnlyTransaction();
68         Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
69
70     }
71
72     @Test
73     public void testClose() throws Exception {
74         new TransactionChainProxy(mockComponentFactory).close();
75
76         verify(mockActorContext, times(1)).broadcast(anyObject());
77     }
78
79     @Test
80     public void testTransactionChainsHaveUniqueId(){
81         TransactionChainProxy one = new TransactionChainProxy(mockComponentFactory);
82         TransactionChainProxy two = new TransactionChainProxy(mockComponentFactory);
83
84         Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
85     }
86
87     @Test
88     public void testRateLimitingUsedInReadWriteTxCreation(){
89         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
90
91         txChainProxy.newReadWriteTransaction();
92
93         verify(mockActorContext, times(1)).acquireTxCreationPermit();
94     }
95
96     @Test
97     public void testRateLimitingUsedInWriteOnlyTxCreation(){
98         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
99
100         txChainProxy.newWriteOnlyTransaction();
101
102         verify(mockActorContext, times(1)).acquireTxCreationPermit();
103     }
104
105
106     @Test
107     public void testRateLimitingNotUsedInReadOnlyTxCreation(){
108         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
109
110         txChainProxy.newReadOnlyTransaction();
111
112         verify(mockActorContext, times(0)).acquireTxCreationPermit();
113     }
114
115     /**
116      * Tests 2 successive chained write-only transactions and verifies the second transaction isn't
117      * initiated until the first one completes its read future.
118      */
119     @Test
120     public void testChainedWriteOnlyTransactions() throws Exception {
121         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
122
123         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
124
125         ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
126
127         Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
128         doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
129                 eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
130
131         DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
132
133         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
134         writeTx1.write(TestModel.TEST_PATH, writeNode1);
135
136         writeTx1.ready();
137
138         verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
139
140         verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
141
142         ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
143
144         expectBatchedModifications(txActorRef2, 1);
145
146         final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
147
148         final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction();
149
150         final AtomicReference<Exception> caughtEx = new AtomicReference<>();
151         final CountDownLatch write2Complete = new CountDownLatch(1);
152         new Thread() {
153             @Override
154             public void run() {
155                 try {
156                     writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
157                 } catch (Exception e) {
158                     caughtEx.set(e);
159                 } finally {
160                     write2Complete.countDown();
161                 }
162             }
163         }.start();
164
165         assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
166
167         if(caughtEx.get() != null) {
168             throw caughtEx.get();
169         }
170
171         try {
172             verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
173         } catch (AssertionError e) {
174             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
175         }
176
177         batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
178
179         // Tx 2 should've proceeded to find the primary shard.
180         verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
181     }
182
183     /**
184      * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
185      * initiated until the first one completes its read future.
186      */
187     @Test
188     public void testChainedReadWriteTransactions() throws Exception {
189         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
190
191         ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
192
193         expectBatchedModifications(txActorRef1, 1);
194
195         Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
196         doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
197                 eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
198
199         DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
200
201         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
202         writeTx1.write(TestModel.TEST_PATH, writeNode1);
203
204         writeTx1.ready();
205
206         verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
207
208         String tx2MemberName = "mock-member";
209         ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
210         ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
211                 DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2);
212
213         expectBatchedModifications(txActorRef2, 1);
214
215         final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
216
217         final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction();
218
219         final AtomicReference<Exception> caughtEx = new AtomicReference<>();
220         final CountDownLatch write2Complete = new CountDownLatch(1);
221         new Thread() {
222             @Override
223             public void run() {
224                 try {
225                     writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
226                 } catch (Exception e) {
227                     caughtEx.set(e);
228                 } finally {
229                     write2Complete.countDown();
230                 }
231             }
232         }.start();
233
234         assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
235
236         if(caughtEx.get() != null) {
237             throw caughtEx.get();
238         }
239
240         try {
241             verify(mockActorContext, never()).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
242                     eqCreateTransaction(tx2MemberName, READ_WRITE));
243         } catch (AssertionError e) {
244             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
245         }
246
247         readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
248
249         verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
250                 eqCreateTransaction(tx2MemberName, READ_WRITE), any(Timeout.class));
251     }
252
253     @Test(expected=IllegalStateException.class)
254     public void testChainedWriteTransactionsWithPreviousTxNotReady() throws Exception {
255         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
256
257         expectBatchedModifications(actorRef, 1);
258
259         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
260
261         DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
262
263         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
264         writeTx1.write(TestModel.TEST_PATH, writeNode1);
265
266         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
267
268         txChainProxy.newWriteOnlyTransaction();
269     }
270 }