66b8c0ec8df3dfd353d39ba5887f04bce65b8000
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.eq;
15 import static org.mockito.Matchers.isA;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.timeout;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
23 import akka.actor.ActorRef;
24 import akka.util.Timeout;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicReference;
28 import java.util.function.Function;
29 import org.junit.Assert;
30 import org.junit.Test;
31 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
32 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
33 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
34 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
39 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
40 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
41 import scala.concurrent.Promise;
42
43 public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
44
45     @SuppressWarnings("resource")
46     @Test
47     public void testNewReadOnlyTransaction() throws Exception {
48
49         DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadOnlyTransaction();
50         Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
51
52     }
53
54     @SuppressWarnings("resource")
55     @Test
56     public void testNewReadWriteTransaction() throws Exception {
57         DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadWriteTransaction();
58         Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
59
60     }
61
62     @SuppressWarnings("resource")
63     @Test
64     public void testNewWriteOnlyTransaction() throws Exception {
65         DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newWriteOnlyTransaction();
66         Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
67
68     }
69
70     @Test
71     public void testClose() throws Exception {
72         new TransactionChainProxy(mockComponentFactory).close();
73
74         verify(mockActorContext, times(1)).broadcast(any(Function.class));
75     }
76
77     @Test
78     public void testTransactionChainsHaveUniqueId() {
79         try (TransactionChainProxy one = new TransactionChainProxy(mockComponentFactory)) {
80             try (TransactionChainProxy two = new TransactionChainProxy(mockComponentFactory)) {
81
82                 Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
83             }
84         }
85     }
86
87     @Test
88     public void testRateLimitingUsedInReadWriteTxCreation() {
89         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory)) {
90
91             txChainProxy.newReadWriteTransaction();
92
93             verify(mockActorContext, times(1)).acquireTxCreationPermit();
94         }
95     }
96
97     @Test
98     public void testRateLimitingUsedInWriteOnlyTxCreation() {
99         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory)) {
100
101             txChainProxy.newWriteOnlyTransaction();
102
103             verify(mockActorContext, times(1)).acquireTxCreationPermit();
104         }
105     }
106
107     @Test
108     public void testRateLimitingNotUsedInReadOnlyTxCreation() {
109         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory)) {
110
111             txChainProxy.newReadOnlyTransaction();
112
113             verify(mockActorContext, times(0)).acquireTxCreationPermit();
114         }
115     }
116
117     /**
118      * Tests 2 successive chained write-only transactions and verifies the second transaction isn't
119      * initiated until the first one completes its read future.
120      */
121     @Test
122     public void testChainedWriteOnlyTransactions() throws Exception {
123         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
124
125         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory)) {
126
127             ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
128
129             Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
130             doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
131                     eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
132
133             DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
134
135             NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
136             writeTx1.write(TestModel.TEST_PATH, writeNode1);
137
138             writeTx1.ready();
139
140             verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
141
142             verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
143
144             ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
145
146             expectBatchedModifications(txActorRef2, 1);
147
148             final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
149
150             final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction();
151
152             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
153             final CountDownLatch write2Complete = new CountDownLatch(1);
154             new Thread() {
155                 @Override
156                 public void run() {
157                     try {
158                         writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
159                     } catch (Exception e) {
160                         caughtEx.set(e);
161                     } finally {
162                         write2Complete.countDown();
163                     }
164                 }
165             }.start();
166
167             assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
168
169             if (caughtEx.get() != null) {
170                 throw caughtEx.get();
171             }
172
173             try {
174                 verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
175             } catch (AssertionError e) {
176                 fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
177             }
178
179             batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
180
181             // Tx 2 should've proceeded to find the primary shard.
182             verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(
183                     eq(DefaultShardStrategy.DEFAULT_SHARD));
184         }
185     }
186
187     /**
188      * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
189      * initiated until the first one completes its read future.
190      */
191     @Test
192     public void testChainedReadWriteTransactions() throws Exception {
193         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory)) {
194
195             ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
196
197             expectBatchedModifications(txActorRef1, 1);
198
199             Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
200             doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
201                     eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
202
203             DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
204
205             NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
206             writeTx1.write(TestModel.TEST_PATH, writeNode1);
207
208             writeTx1.ready();
209
210             verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
211
212             String tx2MemberName = "mock-member";
213             ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
214             ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
215                     DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2);
216
217             expectBatchedModifications(txActorRef2, 1);
218
219             final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
220
221             final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction();
222
223             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
224             final CountDownLatch write2Complete = new CountDownLatch(1);
225             new Thread() {
226                 @Override
227                 public void run() {
228                     try {
229                         writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
230                     } catch (Exception e) {
231                         caughtEx.set(e);
232                     } finally {
233                         write2Complete.countDown();
234                     }
235                 }
236             }.start();
237
238             assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
239
240             if (caughtEx.get() != null) {
241                 throw caughtEx.get();
242             }
243
244             try {
245                 verify(mockActorContext, never()).executeOperationAsync(
246                         eq(getSystem().actorSelection(shardActorRef2.path())),
247                         eqCreateTransaction(tx2MemberName, READ_WRITE));
248             } catch (AssertionError e) {
249                 fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
250             }
251
252             readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
253
254             verify(mockActorContext, timeout(5000)).executeOperationAsync(
255                     eq(getSystem().actorSelection(shardActorRef2.path())),
256                     eqCreateTransaction(tx2MemberName, READ_WRITE), any(Timeout.class));
257         }
258     }
259
260     @Test(expected = IllegalStateException.class)
261     public void testChainedWriteTransactionsWithPreviousTxNotReady() throws Exception {
262         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
263
264         expectBatchedModifications(actorRef, 1);
265
266         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory)) {
267
268             DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
269
270             NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
271             writeTx1.write(TestModel.TEST_PATH, writeNode1);
272
273             NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
274
275             txChainProxy.newWriteOnlyTransaction();
276         }
277     }
278 }