Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import static org.mockito.Mockito.inOrder;
17 import static org.mockito.Mockito.mock;
18
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.actor.Status.Failure;
22 import akka.actor.Terminated;
23 import akka.dispatch.Dispatchers;
24 import akka.testkit.TestActorRef;
25 import akka.testkit.javadsl.TestKit;
26 import com.google.common.base.Throwables;
27 import java.time.Duration;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.mockito.InOrder;
32 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
33 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
34 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
35 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
36 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
40 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
44 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
45 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
46 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
47 import org.opendaylight.controller.cluster.raft.TestActorFactory;
48 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
52 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
53 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
54 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
55 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
56 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
57
58 public class ShardTransactionTest extends AbstractActorTest {
59
60     private static final TransactionType RO = TransactionType.READ_ONLY;
61     private static final TransactionType RW = TransactionType.READ_WRITE;
62     private static final TransactionType WO = TransactionType.WRITE_ONLY;
63
64     private static final ShardIdentifier SHARD_IDENTIFIER =
65         ShardIdentifier.create("inventory", MEMBER_NAME, "config");
66     private static final EffectiveModelContext TEST_MODEL = TestModel.createTestContext();
67
68     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
69
70     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
71
72     private TestActorRef<Shard> shard;
73     private ShardDataTree store;
74     private TestKit testKit;
75
76     @Before
77     public void setUp() {
78         shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
79                 .schemaContextProvider(() -> TEST_MODEL).props()
80                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
81         ShardTestKit.waitUntilLeader(shard);
82         store = shard.underlyingActor().getDataStore();
83         testKit = new TestKit(getSystem());
84     }
85
86     private ActorRef newTransactionActor(final TransactionType type,
87             final AbstractShardDataTreeTransaction<?> transaction, final String name) {
88         Props props = ShardTransaction.props(type, transaction, shard, datastoreContext,
89                 shard.underlyingActor().getShardMBean());
90         return actorFactory.createActorNoVerify(props, name);
91     }
92
93     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
94         return store.newReadOnlyTransaction(nextTransactionId());
95     }
96
97     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
98         return store.newReadWriteTransaction(nextTransactionId());
99     }
100
101     @Test
102     public void testOnReceiveReadData() {
103         testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
104         testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
105     }
106
107     private void testOnReceiveReadData(final ActorRef transaction) {
108         transaction.tell(new ReadData(YangInstanceIdentifier.empty(), DataStoreVersions.CURRENT_VERSION),
109             testKit.getRef());
110
111         ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
112
113         assertNotNull(reply.getNormalizedNode());
114     }
115
116     @Test
117     public void testOnReceiveReadDataWhenDataNotFound() {
118         testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RO, readOnlyTransaction(),
119             "testReadDataWhenDataNotFoundRO"));
120         testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RW, readWriteTransaction(),
121             "testReadDataWhenDataNotFoundRW"));
122     }
123
124     private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
125         transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
126
127         ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
128
129         assertNull(reply.getNormalizedNode());
130     }
131
132     @Test
133     public void testOnReceiveDataExistsPositive() {
134         testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
135         testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
136     }
137
138     private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
139         transaction.tell(new DataExists(YangInstanceIdentifier.empty(), DataStoreVersions.CURRENT_VERSION),
140             testKit.getRef());
141
142         DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
143
144         assertTrue(reply.exists());
145     }
146
147     @Test
148     public void testOnReceiveDataExistsNegative() {
149         testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
150         testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
151     }
152
153     private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
154         transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
155
156         DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
157
158         assertFalse(reply.exists());
159     }
160
161     @Test
162     public void testOnReceiveBatchedModifications() {
163         ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
164         DataTreeModification mockModification = mock(DataTreeModification.class);
165         ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
166             nextTransactionId(), mockModification);
167         final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
168
169         YangInstanceIdentifier writePath = TestModel.TEST_PATH;
170         NormalizedNode writeData = Builders.containerBuilder()
171             .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
172             .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
173             .build();
174
175         YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
176         NormalizedNode mergeData = Builders.containerBuilder()
177             .withNodeIdentifier(new NodeIdentifier(TestModel.OUTER_LIST_QNAME))
178             .build();
179
180         YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
181
182         BatchedModifications batched = new BatchedModifications(nextTransactionId(),
183             DataStoreVersions.CURRENT_VERSION);
184         batched.addModification(new WriteModification(writePath, writeData));
185         batched.addModification(new MergeModification(mergePath, mergeData));
186         batched.addModification(new DeleteModification(deletePath));
187
188         transaction.tell(batched, testKit.getRef());
189
190         BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
191             BatchedModificationsReply.class);
192         assertEquals("getNumBatched", 3, reply.getNumBatched());
193
194         InOrder inOrder = inOrder(mockModification);
195         inOrder.verify(mockModification).write(writePath, writeData);
196         inOrder.verify(mockModification).merge(mergePath, mergeData);
197         inOrder.verify(mockModification).delete(deletePath);
198     }
199
200     @Test
201     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() {
202         final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
203                 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
204
205         TestKit watcher = new TestKit(getSystem());
206         watcher.watch(transaction);
207
208         YangInstanceIdentifier writePath = TestModel.TEST_PATH;
209         NormalizedNode writeData = Builders.containerBuilder()
210             .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
211             .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
212             .build();
213
214         final TransactionIdentifier tx1 = nextTransactionId();
215         BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
216         batched.addModification(new WriteModification(writePath, writeData));
217
218         transaction.tell(batched, testKit.getRef());
219         BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
220             BatchedModificationsReply.class);
221         assertEquals("getNumBatched", 1, reply.getNumBatched());
222
223         batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
224         batched.setReady();
225         batched.setTotalMessagesSent(2);
226
227         transaction.tell(batched, testKit.getRef());
228         testKit.expectMsgClass(Duration.ofSeconds(5), ReadyTransactionReply.class);
229         watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
230     }
231
232     @Test
233     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() {
234         final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
235                 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
236
237         TestKit watcher = new TestKit(getSystem());
238         watcher.watch(transaction);
239
240         YangInstanceIdentifier writePath = TestModel.TEST_PATH;
241         NormalizedNode writeData = Builders.containerBuilder()
242             .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
243             .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
244             .build();
245
246         BatchedModifications batched = new BatchedModifications(nextTransactionId(),
247             DataStoreVersions.CURRENT_VERSION);
248         batched.addModification(new WriteModification(writePath, writeData));
249         batched.setReady();
250         batched.setDoCommitOnReady(true);
251         batched.setTotalMessagesSent(1);
252
253         transaction.tell(batched, testKit.getRef());
254         testKit.expectMsgClass(Duration.ofSeconds(5), CommitTransactionReply.class);
255         watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
256     }
257
258     @Test(expected = TestException.class)
259     public void testOnReceiveBatchedModificationsFailure() throws Exception {
260         ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
261         DataTreeModification mockModification = mock(DataTreeModification.class);
262         ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
263             nextTransactionId(), mockModification);
264         final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
265             "testOnReceiveBatchedModificationsFailure");
266
267         TestKit watcher = new TestKit(getSystem());
268         watcher.watch(transaction);
269
270         YangInstanceIdentifier path = TestModel.TEST_PATH;
271         ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
272
273         doThrow(new TestException()).when(mockModification).write(path, node);
274
275         final TransactionIdentifier tx1 = nextTransactionId();
276         BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
277         batched.addModification(new WriteModification(path, node));
278
279         transaction.tell(batched, testKit.getRef());
280         testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
281
282         batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
283         batched.setReady();
284         batched.setTotalMessagesSent(2);
285
286         transaction.tell(batched, testKit.getRef());
287         Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
288         watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
289
290         if (failure != null) {
291             Throwables.propagateIfPossible(failure.cause(), Exception.class);
292             throw new RuntimeException(failure.cause());
293         }
294     }
295
296     @Test(expected = IllegalStateException.class)
297     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
298         final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
299                 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
300
301         TestKit watcher = new TestKit(getSystem());
302         watcher.watch(transaction);
303
304         BatchedModifications batched = new BatchedModifications(nextTransactionId(),
305             DataStoreVersions.CURRENT_VERSION);
306         batched.setReady();
307         batched.setTotalMessagesSent(2);
308
309         transaction.tell(batched, testKit.getRef());
310
311         Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
312         watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
313
314         if (failure != null) {
315             Throwables.throwIfInstanceOf(failure.cause(), Exception.class);
316             Throwables.throwIfUnchecked(failure.cause());
317             throw new RuntimeException(failure.cause());
318         }
319     }
320
321     @Test
322     public void testReadWriteTxOnReceiveCloseTransaction() {
323         final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
324                 "testReadWriteTxOnReceiveCloseTransaction");
325
326         testKit.watch(transaction);
327
328         transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
329
330         testKit.expectMsgClass(Duration.ofSeconds(3), CloseTransactionReply.class);
331         testKit.expectTerminated(Duration.ofSeconds(3), transaction);
332     }
333
334     @Test
335     public void testWriteOnlyTxOnReceiveCloseTransaction() {
336         final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
337                 "testWriteTxOnReceiveCloseTransaction");
338
339         testKit.watch(transaction);
340
341         transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
342
343         testKit.expectMsgClass(Duration.ofSeconds(3), CloseTransactionReply.class);
344         testKit.expectTerminated(Duration.ofSeconds(3), transaction);
345     }
346
347     @Test
348     public void testReadOnlyTxOnReceiveCloseTransaction() {
349         final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
350                 "testReadOnlyTxOnReceiveCloseTransaction");
351
352         testKit.watch(transaction);
353
354         transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
355
356         testKit.expectMsgClass(Duration.ofSeconds(3), Terminated.class);
357     }
358
359     @Test
360     public void testShardTransactionInactivity() {
361         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
362                 500, TimeUnit.MILLISECONDS).build();
363
364         final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
365             "testShardTransactionInactivity");
366
367         testKit.watch(transaction);
368
369         testKit.expectMsgClass(Duration.ofSeconds(3), Terminated.class);
370     }
371
372     public static class TestException extends RuntimeException {
373         private static final long serialVersionUID = 1L;
374     }
375 }