BUG-5903: do not rely on primary info on failure
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.eq;
15 import static org.mockito.Matchers.isA;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.timeout;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
23 import akka.actor.ActorRef;
24 import akka.util.Timeout;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicReference;
28 import java.util.function.Function;
29 import org.junit.Assert;
30 import org.junit.Test;
31 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
32 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
33 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
34 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
35 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
40 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
41 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
42 import scala.concurrent.Promise;
43
44 public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
45     private LocalHistoryIdentifier historyId;
46
47     @Override
48     public void setUp() {
49         super.setUp();
50         historyId = MockIdentifiers.historyIdentifier(TransactionChainProxyTest.class, memberName);
51     }
52
53     @SuppressWarnings("resource")
54     @Test
55     public void testNewReadOnlyTransaction() {
56
57         DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory, historyId).newReadOnlyTransaction();
58         Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
59
60     }
61
62     @SuppressWarnings("resource")
63     @Test
64     public void testNewReadWriteTransaction() {
65         DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory, historyId).newReadWriteTransaction();
66         Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
67
68     }
69
70     @SuppressWarnings("resource")
71     @Test
72     public void testNewWriteOnlyTransaction() {
73         DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory, historyId).newWriteOnlyTransaction();
74         Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
75
76     }
77
78     @Test
79     public void testClose() {
80         new TransactionChainProxy(mockComponentFactory, historyId).close();
81
82         verify(mockActorContext, times(1)).broadcast(any(Function.class), any(Class.class));
83     }
84
85     @Test
86     public void testRateLimitingUsedInReadWriteTxCreation() {
87         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
88
89             txChainProxy.newReadWriteTransaction();
90
91             verify(mockActorContext, times(1)).acquireTxCreationPermit();
92         }
93     }
94
95     @Test
96     public void testRateLimitingUsedInWriteOnlyTxCreation() {
97         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
98
99             txChainProxy.newWriteOnlyTransaction();
100
101             verify(mockActorContext, times(1)).acquireTxCreationPermit();
102         }
103     }
104
105     @Test
106     public void testRateLimitingNotUsedInReadOnlyTxCreation() {
107         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
108
109             txChainProxy.newReadOnlyTransaction();
110
111             verify(mockActorContext, times(0)).acquireTxCreationPermit();
112         }
113     }
114
115     /**
116      * Tests 2 successive chained write-only transactions and verifies the second transaction isn't
117      * initiated until the first one completes its read future.
118      */
119     @Test
120     public void testChainedWriteOnlyTransactions() throws Exception {
121         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
122
123         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
124
125             ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
126
127             Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
128             doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
129                     eq(actorSelection(txActorRef1)), isA(BatchedModifications.class), any(Timeout.class));
130
131             DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
132
133             NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
134             writeTx1.write(TestModel.TEST_PATH, writeNode1);
135
136             writeTx1.ready();
137
138             verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
139
140             verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
141
142             ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
143
144             expectBatchedModifications(txActorRef2, 1);
145
146             final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
147
148             final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction();
149
150             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
151             final CountDownLatch write2Complete = new CountDownLatch(1);
152             new Thread() {
153                 @Override
154                 public void run() {
155                     try {
156                         writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
157                     } catch (Exception e) {
158                         caughtEx.set(e);
159                     } finally {
160                         write2Complete.countDown();
161                     }
162                 }
163             }.start();
164
165             assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
166
167             if (caughtEx.get() != null) {
168                 throw caughtEx.get();
169             }
170
171             try {
172                 verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
173             } catch (AssertionError e) {
174                 fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
175             }
176
177             batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
178
179             // Tx 2 should've proceeded to find the primary shard.
180             verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(
181                     eq(DefaultShardStrategy.DEFAULT_SHARD));
182         }
183     }
184
185     /**
186      * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
187      * initiated until the first one completes its read future.
188      */
189     @Test
190     public void testChainedReadWriteTransactions() throws Exception {
191         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
192
193             ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
194
195             expectBatchedModifications(txActorRef1, 1);
196
197             Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
198             doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
199                     eq(actorSelection(txActorRef1)), isA(BatchedModifications.class), any(Timeout.class));
200
201             DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
202
203             NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
204             writeTx1.write(TestModel.TEST_PATH, writeNode1);
205
206             writeTx1.ready();
207
208             verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
209
210             String tx2MemberName = "mock-member";
211             ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
212             ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
213                     DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2);
214
215             expectBatchedModifications(txActorRef2, 1);
216
217             final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
218
219             final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction();
220
221             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
222             final CountDownLatch write2Complete = new CountDownLatch(1);
223             new Thread() {
224                 @Override
225                 public void run() {
226                     try {
227                         writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
228                     } catch (Exception e) {
229                         caughtEx.set(e);
230                     } finally {
231                         write2Complete.countDown();
232                     }
233                 }
234             }.start();
235
236             assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
237
238             if (caughtEx.get() != null) {
239                 throw caughtEx.get();
240             }
241
242             try {
243                 verify(mockActorContext, never()).executeOperationAsync(
244                         eq(getSystem().actorSelection(shardActorRef2.path())),
245                         eqCreateTransaction(tx2MemberName, READ_WRITE));
246             } catch (AssertionError e) {
247                 fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
248             }
249
250             readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
251
252             verify(mockActorContext, timeout(5000)).executeOperationAsync(
253                     eq(getSystem().actorSelection(shardActorRef2.path())),
254                     eqCreateTransaction(tx2MemberName, READ_WRITE), any(Timeout.class));
255         }
256     }
257
258     @Test(expected = IllegalStateException.class)
259     public void testChainedWriteTransactionsWithPreviousTxNotReady() {
260         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
261
262         expectBatchedModifications(actorRef, 1);
263
264         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
265
266             DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
267
268             NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
269             writeTx1.write(TestModel.TEST_PATH, writeNode1);
270
271             NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
272
273             txChainProxy.newWriteOnlyTransaction();
274         }
275     }
276 }