Bump odlparent/yangtools/mdsal
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import static org.mockito.Mockito.inOrder;
17 import static org.mockito.Mockito.mock;
18
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.actor.Status.Failure;
22 import akka.actor.Terminated;
23 import akka.dispatch.Dispatchers;
24 import akka.testkit.TestActorRef;
25 import akka.testkit.javadsl.TestKit;
26 import com.google.common.base.Throwables;
27 import java.time.Duration;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.mockito.InOrder;
32 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
33 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
34 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
35 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
36 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
40 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
44 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
45 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
46 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
47 import org.opendaylight.controller.cluster.raft.TestActorFactory;
48 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
51 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
52 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
53 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
54 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
55 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
56
57 public class ShardTransactionTest extends AbstractActorTest {
58
59     private static final TransactionType RO = TransactionType.READ_ONLY;
60     private static final TransactionType RW = TransactionType.READ_WRITE;
61     private static final TransactionType WO = TransactionType.WRITE_ONLY;
62
63     private static final ShardIdentifier SHARD_IDENTIFIER =
64         ShardIdentifier.create("inventory", MEMBER_NAME, "config");
65     private static final EffectiveModelContext TEST_MODEL = TestModel.createTestContext();
66
67     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
68
69     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
70
71     private TestActorRef<Shard> shard;
72     private ShardDataTree store;
73     private TestKit testKit;
74
75     @Before
76     public void setUp() {
77         shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
78                 .schemaContextProvider(() -> TEST_MODEL).props()
79                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
80         ShardTestKit.waitUntilLeader(shard);
81         store = shard.underlyingActor().getDataStore();
82         testKit = new TestKit(getSystem());
83     }
84
85     private ActorRef newTransactionActor(final TransactionType type,
86             final AbstractShardDataTreeTransaction<?> transaction, final String name) {
87         Props props = ShardTransaction.props(type, transaction, shard, datastoreContext,
88                 shard.underlyingActor().getShardMBean());
89         return actorFactory.createActorNoVerify(props, name);
90     }
91
92     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
93         return store.newReadOnlyTransaction(nextTransactionId());
94     }
95
96     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
97         return store.newReadWriteTransaction(nextTransactionId());
98     }
99
100     @Test
101     public void testOnReceiveReadData() {
102         testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
103         testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
104     }
105
106     private void testOnReceiveReadData(final ActorRef transaction) {
107         transaction.tell(new ReadData(YangInstanceIdentifier.empty(), DataStoreVersions.CURRENT_VERSION),
108             testKit.getRef());
109
110         ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
111
112         assertNotNull(reply.getNormalizedNode());
113     }
114
115     @Test
116     public void testOnReceiveReadDataWhenDataNotFound() {
117         testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RO, readOnlyTransaction(),
118             "testReadDataWhenDataNotFoundRO"));
119         testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RW, readWriteTransaction(),
120             "testReadDataWhenDataNotFoundRW"));
121     }
122
123     private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
124         transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
125
126         ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
127
128         assertNull(reply.getNormalizedNode());
129     }
130
131     @Test
132     public void testOnReceiveDataExistsPositive() {
133         testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
134         testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
135     }
136
137     private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
138         transaction.tell(new DataExists(YangInstanceIdentifier.empty(), DataStoreVersions.CURRENT_VERSION),
139             testKit.getRef());
140
141         DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
142
143         assertTrue(reply.exists());
144     }
145
146     @Test
147     public void testOnReceiveDataExistsNegative() {
148         testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
149         testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
150     }
151
152     private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
153         transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
154
155         DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
156
157         assertFalse(reply.exists());
158     }
159
160     @Test
161     public void testOnReceiveBatchedModifications() {
162         ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
163         DataTreeModification mockModification = mock(DataTreeModification.class);
164         ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
165             nextTransactionId(), mockModification);
166         final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
167
168         YangInstanceIdentifier writePath = TestModel.TEST_PATH;
169         NormalizedNode writeData = ImmutableContainerNodeBuilder.create()
170                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
171                 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
172
173         YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
174         NormalizedNode mergeData = ImmutableContainerNodeBuilder.create()
175                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME))
176                 .build();
177
178         YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
179
180         BatchedModifications batched = new BatchedModifications(nextTransactionId(),
181             DataStoreVersions.CURRENT_VERSION);
182         batched.addModification(new WriteModification(writePath, writeData));
183         batched.addModification(new MergeModification(mergePath, mergeData));
184         batched.addModification(new DeleteModification(deletePath));
185
186         transaction.tell(batched, testKit.getRef());
187
188         BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
189             BatchedModificationsReply.class);
190         assertEquals("getNumBatched", 3, reply.getNumBatched());
191
192         InOrder inOrder = inOrder(mockModification);
193         inOrder.verify(mockModification).write(writePath, writeData);
194         inOrder.verify(mockModification).merge(mergePath, mergeData);
195         inOrder.verify(mockModification).delete(deletePath);
196     }
197
198     @Test
199     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() {
200         final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
201                 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
202
203         TestKit watcher = new TestKit(getSystem());
204         watcher.watch(transaction);
205
206         YangInstanceIdentifier writePath = TestModel.TEST_PATH;
207         NormalizedNode writeData = ImmutableContainerNodeBuilder.create()
208                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
209                 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
210
211         final TransactionIdentifier tx1 = nextTransactionId();
212         BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
213         batched.addModification(new WriteModification(writePath, writeData));
214
215         transaction.tell(batched, testKit.getRef());
216         BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
217             BatchedModificationsReply.class);
218         assertEquals("getNumBatched", 1, reply.getNumBatched());
219
220         batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
221         batched.setReady();
222         batched.setTotalMessagesSent(2);
223
224         transaction.tell(batched, testKit.getRef());
225         testKit.expectMsgClass(Duration.ofSeconds(5), ReadyTransactionReply.class);
226         watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
227     }
228
229     @Test
230     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() {
231         final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
232                 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
233
234         TestKit watcher = new TestKit(getSystem());
235         watcher.watch(transaction);
236
237         YangInstanceIdentifier writePath = TestModel.TEST_PATH;
238         NormalizedNode writeData = ImmutableContainerNodeBuilder.create()
239                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
240                 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
241
242         BatchedModifications batched = new BatchedModifications(nextTransactionId(),
243             DataStoreVersions.CURRENT_VERSION);
244         batched.addModification(new WriteModification(writePath, writeData));
245         batched.setReady();
246         batched.setDoCommitOnReady(true);
247         batched.setTotalMessagesSent(1);
248
249         transaction.tell(batched, testKit.getRef());
250         testKit.expectMsgClass(Duration.ofSeconds(5), CommitTransactionReply.class);
251         watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
252     }
253
254     @Test(expected = TestException.class)
255     public void testOnReceiveBatchedModificationsFailure() throws Exception {
256         ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
257         DataTreeModification mockModification = mock(DataTreeModification.class);
258         ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
259             nextTransactionId(), mockModification);
260         final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
261             "testOnReceiveBatchedModificationsFailure");
262
263         TestKit watcher = new TestKit(getSystem());
264         watcher.watch(transaction);
265
266         YangInstanceIdentifier path = TestModel.TEST_PATH;
267         ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
268
269         doThrow(new TestException()).when(mockModification).write(path, node);
270
271         final TransactionIdentifier tx1 = nextTransactionId();
272         BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
273         batched.addModification(new WriteModification(path, node));
274
275         transaction.tell(batched, testKit.getRef());
276         testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
277
278         batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
279         batched.setReady();
280         batched.setTotalMessagesSent(2);
281
282         transaction.tell(batched, testKit.getRef());
283         Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
284         watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
285
286         if (failure != null) {
287             Throwables.propagateIfPossible(failure.cause(), Exception.class);
288             throw new RuntimeException(failure.cause());
289         }
290     }
291
292     @Test(expected = IllegalStateException.class)
293     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
294         final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
295                 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
296
297         TestKit watcher = new TestKit(getSystem());
298         watcher.watch(transaction);
299
300         BatchedModifications batched = new BatchedModifications(nextTransactionId(),
301             DataStoreVersions.CURRENT_VERSION);
302         batched.setReady();
303         batched.setTotalMessagesSent(2);
304
305         transaction.tell(batched, testKit.getRef());
306
307         Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
308         watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
309
310         if (failure != null) {
311             Throwables.throwIfInstanceOf(failure.cause(), Exception.class);
312             Throwables.throwIfUnchecked(failure.cause());
313             throw new RuntimeException(failure.cause());
314         }
315     }
316
317     @Test
318     public void testReadWriteTxOnReceiveCloseTransaction() {
319         final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
320                 "testReadWriteTxOnReceiveCloseTransaction");
321
322         testKit.watch(transaction);
323
324         transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
325
326         testKit.expectMsgClass(Duration.ofSeconds(3), CloseTransactionReply.class);
327         testKit.expectTerminated(Duration.ofSeconds(3), transaction);
328     }
329
330     @Test
331     public void testWriteOnlyTxOnReceiveCloseTransaction() {
332         final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
333                 "testWriteTxOnReceiveCloseTransaction");
334
335         testKit.watch(transaction);
336
337         transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
338
339         testKit.expectMsgClass(Duration.ofSeconds(3), CloseTransactionReply.class);
340         testKit.expectTerminated(Duration.ofSeconds(3), transaction);
341     }
342
343     @Test
344     public void testReadOnlyTxOnReceiveCloseTransaction() {
345         final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
346                 "testReadOnlyTxOnReceiveCloseTransaction");
347
348         testKit.watch(transaction);
349
350         transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
351
352         testKit.expectMsgClass(Duration.ofSeconds(3), Terminated.class);
353     }
354
355     @Test
356     public void testShardTransactionInactivity() {
357         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
358                 500, TimeUnit.MILLISECONDS).build();
359
360         final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
361             "testShardTransactionInactivity");
362
363         testKit.watch(transaction);
364
365         testKit.expectMsgClass(Duration.ofSeconds(3), Terminated.class);
366     }
367
368     public static class TestException extends RuntimeException {
369         private static final long serialVersionUID = 1L;
370     }
371 }