BUG-2138: DistributedShardListeners support for nested shards
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.eq;
15 import static org.mockito.Matchers.isA;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.timeout;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
22 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
23
24 import akka.actor.ActorRef;
25 import akka.util.Timeout;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicReference;
29 import java.util.function.Function;
30 import org.junit.Assert;
31 import org.junit.Test;
32 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
33 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
34 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
35 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
36 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
43 import scala.concurrent.Promise;
44
45 public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
46     private LocalHistoryIdentifier historyId;
47
48     @Override
49     public void setUp() {
50         super.setUp();
51         historyId = MockIdentifiers.historyIdentifier(TransactionChainProxyTest.class, memberName);
52     }
53
54     @SuppressWarnings("resource")
55     @Test
56     public void testNewReadOnlyTransaction() {
57
58         DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory, historyId).newReadOnlyTransaction();
59         Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
60
61     }
62
63     @SuppressWarnings("resource")
64     @Test
65     public void testNewReadWriteTransaction() {
66         DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory, historyId).newReadWriteTransaction();
67         Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
68
69     }
70
71     @SuppressWarnings("resource")
72     @Test
73     public void testNewWriteOnlyTransaction() {
74         DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory, historyId).newWriteOnlyTransaction();
75         Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
76
77     }
78
79     @SuppressWarnings("unchecked")
80     @Test
81     public void testClose() {
82         new TransactionChainProxy(mockComponentFactory, historyId).close();
83
84         verify(mockActorContext, times(1)).broadcast(any(Function.class), any(Class.class));
85     }
86
87     @Test
88     public void testRateLimitingUsedInReadWriteTxCreation() {
89         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
90
91             txChainProxy.newReadWriteTransaction();
92
93             verify(mockActorContext, times(1)).acquireTxCreationPermit();
94         }
95     }
96
97     @Test
98     public void testRateLimitingUsedInWriteOnlyTxCreation() {
99         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
100
101             txChainProxy.newWriteOnlyTransaction();
102
103             verify(mockActorContext, times(1)).acquireTxCreationPermit();
104         }
105     }
106
107     @Test
108     public void testRateLimitingNotUsedInReadOnlyTxCreation() {
109         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
110
111             txChainProxy.newReadOnlyTransaction();
112
113             verify(mockActorContext, times(0)).acquireTxCreationPermit();
114         }
115     }
116
117     /**
118      * Tests 2 successive chained write-only transactions and verifies the second transaction isn't
119      * initiated until the first one completes its read future.
120      */
121     @Test
122     @SuppressWarnings("checkstyle:IllegalCatch")
123     public void testChainedWriteOnlyTransactions() throws Exception {
124         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
125
126         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
127
128             ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
129
130             Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
131             doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
132                     eq(actorSelection(txActorRef1)), isA(BatchedModifications.class), any(Timeout.class));
133
134             DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
135
136             NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
137             writeTx1.write(TestModel.TEST_PATH, writeNode1);
138
139             writeTx1.ready();
140
141             verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
142
143             verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
144
145             ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
146
147             expectBatchedModifications(txActorRef2, 1);
148
149             final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
150
151             final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction();
152
153             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
154             final CountDownLatch write2Complete = new CountDownLatch(1);
155             new Thread() {
156                 @Override
157                 public void run() {
158                     try {
159                         writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
160                     } catch (Exception e) {
161                         caughtEx.set(e);
162                     } finally {
163                         write2Complete.countDown();
164                     }
165                 }
166             }.start();
167
168             assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
169
170             if (caughtEx.get() != null) {
171                 throw caughtEx.get();
172             }
173
174             try {
175                 verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
176             } catch (AssertionError e) {
177                 fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
178             }
179
180             batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
181
182             // Tx 2 should've proceeded to find the primary shard.
183             verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(
184                     eq(DefaultShardStrategy.DEFAULT_SHARD));
185         }
186     }
187
188     /**
189      * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
190      * initiated until the first one completes its read future.
191      */
192     @Test
193     @SuppressWarnings("checkstyle:IllegalCatch")
194     public void testChainedReadWriteTransactions() throws Exception {
195         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
196
197             ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
198
199             expectBatchedModifications(txActorRef1, 1);
200
201             Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
202             doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
203                     eq(actorSelection(txActorRef1)), isA(BatchedModifications.class), any(Timeout.class));
204
205             DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
206
207             NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
208             writeTx1.write(TestModel.TEST_PATH, writeNode1);
209
210             writeTx1.ready();
211
212             verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
213
214             String tx2MemberName = "mock-member";
215             ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
216             ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
217                     DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2);
218
219             expectBatchedModifications(txActorRef2, 1);
220
221             final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
222
223             final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction();
224
225             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
226             final CountDownLatch write2Complete = new CountDownLatch(1);
227             new Thread() {
228                 @Override
229                 public void run() {
230                     try {
231                         writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
232                     } catch (Exception e) {
233                         caughtEx.set(e);
234                     } finally {
235                         write2Complete.countDown();
236                     }
237                 }
238             }.start();
239
240             assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
241
242             if (caughtEx.get() != null) {
243                 throw caughtEx.get();
244             }
245
246             try {
247                 verify(mockActorContext, never()).executeOperationAsync(
248                         eq(getSystem().actorSelection(shardActorRef2.path())),
249                         eqCreateTransaction(tx2MemberName, READ_WRITE));
250             } catch (AssertionError e) {
251                 fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
252             }
253
254             readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
255
256             verify(mockActorContext, timeout(5000)).executeOperationAsync(
257                     eq(getSystem().actorSelection(shardActorRef2.path())),
258                     eqCreateTransaction(tx2MemberName, READ_WRITE), any(Timeout.class));
259         }
260     }
261
262     @Test(expected = IllegalStateException.class)
263     public void testChainedWriteTransactionsWithPreviousTxNotReady() {
264         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
265
266         expectBatchedModifications(actorRef, 1);
267
268         try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
269
270             DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
271
272             NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
273             writeTx1.write(TestModel.TEST_PATH, writeNode1);
274
275             txChainProxy.newWriteOnlyTransaction();
276         }
277     }
278 }