Do not use MoreExecutors.sameThreadExecutor()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.anyObject;
15 import static org.mockito.Matchers.eq;
16 import static org.mockito.Matchers.isA;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.timeout;
20 import static org.mockito.Mockito.times;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
23 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
24
25 import akka.actor.ActorRef;
26 import akka.util.Timeout;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicReference;
30 import org.junit.Assert;
31 import org.junit.Test;
32 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
33 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
34 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
35 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
40 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
41 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
42 import scala.concurrent.Promise;
43
44 public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
45
46     @SuppressWarnings("resource")
47     @Test
48     public void testNewReadOnlyTransaction() throws Exception {
49
50         DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadOnlyTransaction();
51         Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
52
53     }
54
55     @SuppressWarnings("resource")
56     @Test
57     public void testNewReadWriteTransaction() throws Exception {
58         DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadWriteTransaction();
59         Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
60
61     }
62
63     @SuppressWarnings("resource")
64     @Test
65     public void testNewWriteOnlyTransaction() throws Exception {
66         DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newWriteOnlyTransaction();
67         Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
68
69     }
70
71     @Test
72     public void testClose() throws Exception {
73         new TransactionChainProxy(mockComponentFactory).close();
74
75         verify(mockActorContext, times(1)).broadcast(anyObject());
76     }
77
78     @Test
79     public void testTransactionChainsHaveUniqueId() {
80         try (TransactionChainProxy one = new TransactionChainProxy(mockComponentFactory)) {
81             try (TransactionChainProxy two = new TransactionChainProxy(mockComponentFactory)) {
82
83                 Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
84             }
85         }
86     }
87
88     @Test
89     public void testRateLimitingUsedInReadWriteTxCreation() {
90         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory)) {
91
92             txChainProxy.newReadWriteTransaction();
93
94             verify(mockActorContext, times(1)).acquireTxCreationPermit();
95         }
96     }
97
98     @Test
99     public void testRateLimitingUsedInWriteOnlyTxCreation() {
100         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory)) {
101
102             txChainProxy.newWriteOnlyTransaction();
103
104             verify(mockActorContext, times(1)).acquireTxCreationPermit();
105         }
106     }
107
108     @Test
109     public void testRateLimitingNotUsedInReadOnlyTxCreation() {
110         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory)) {
111
112             txChainProxy.newReadOnlyTransaction();
113
114             verify(mockActorContext, times(0)).acquireTxCreationPermit();
115         }
116     }
117
118     /**
119      * Tests 2 successive chained write-only transactions and verifies the second transaction isn't
120      * initiated until the first one completes its read future.
121      */
122     @Test
123     public void testChainedWriteOnlyTransactions() throws Exception {
124         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
125
126         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory)) {
127
128             ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
129
130             Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
131             doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
132                     eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
133
134             DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
135
136             NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
137             writeTx1.write(TestModel.TEST_PATH, writeNode1);
138
139             writeTx1.ready();
140
141             verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
142
143             verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
144
145             ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
146
147             expectBatchedModifications(txActorRef2, 1);
148
149             final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
150
151             final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction();
152
153             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
154             final CountDownLatch write2Complete = new CountDownLatch(1);
155             new Thread() {
156                 @Override
157                 public void run() {
158                     try {
159                         writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
160                     } catch (Exception e) {
161                         caughtEx.set(e);
162                     } finally {
163                         write2Complete.countDown();
164                     }
165                 }
166             }.start();
167
168             assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
169
170             if (caughtEx.get() != null) {
171                 throw caughtEx.get();
172             }
173
174             try {
175                 verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
176             } catch (AssertionError e) {
177                 fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
178             }
179
180             batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
181
182             // Tx 2 should've proceeded to find the primary shard.
183             verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(
184                     eq(DefaultShardStrategy.DEFAULT_SHARD));
185         }
186     }
187
188     /**
189      * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
190      * initiated until the first one completes its read future.
191      */
192     @Test
193     public void testChainedReadWriteTransactions() throws Exception {
194         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory)) {
195
196             ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
197
198             expectBatchedModifications(txActorRef1, 1);
199
200             Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
201             doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
202                     eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
203
204             DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
205
206             NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
207             writeTx1.write(TestModel.TEST_PATH, writeNode1);
208
209             writeTx1.ready();
210
211             verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
212
213             String tx2MemberName = "mock-member";
214             ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
215             ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
216                     DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2);
217
218             expectBatchedModifications(txActorRef2, 1);
219
220             final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
221
222             final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction();
223
224             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
225             final CountDownLatch write2Complete = new CountDownLatch(1);
226             new Thread() {
227                 @Override
228                 public void run() {
229                     try {
230                         writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
231                     } catch (Exception e) {
232                         caughtEx.set(e);
233                     } finally {
234                         write2Complete.countDown();
235                     }
236                 }
237             }.start();
238
239             assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
240
241             if (caughtEx.get() != null) {
242                 throw caughtEx.get();
243             }
244
245             try {
246                 verify(mockActorContext, never()).executeOperationAsync(
247                         eq(getSystem().actorSelection(shardActorRef2.path())),
248                         eqCreateTransaction(tx2MemberName, READ_WRITE));
249             } catch (AssertionError e) {
250                 fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
251             }
252
253             readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
254
255             verify(mockActorContext, timeout(5000)).executeOperationAsync(
256                     eq(getSystem().actorSelection(shardActorRef2.path())),
257                     eqCreateTransaction(tx2MemberName, READ_WRITE), any(Timeout.class));
258         }
259     }
260
261     @Test(expected = IllegalStateException.class)
262     public void testChainedWriteTransactionsWithPreviousTxNotReady() throws Exception {
263         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
264
265         expectBatchedModifications(actorRef, 1);
266
267         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory)) {
268
269             DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
270
271             NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
272             writeTx1.write(TestModel.TEST_PATH, writeNode1);
273
274             NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
275
276             txChainProxy.newWriteOnlyTransaction();
277         }
278     }
279 }