Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import static org.mockito.Mockito.inOrder;
17 import static org.mockito.Mockito.mock;
18
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.actor.Status.Failure;
22 import akka.actor.Terminated;
23 import akka.dispatch.Dispatchers;
24 import akka.testkit.TestActorRef;
25 import akka.testkit.javadsl.TestKit;
26 import com.google.common.base.Throwables;
27 import java.time.Duration;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.mockito.InOrder;
32 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
33 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
34 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
35 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
36 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
40 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
44 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
45 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
46 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
47 import org.opendaylight.controller.cluster.raft.TestActorFactory;
48 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
52 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
53 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
54 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
55 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
56 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
57
58 @Deprecated(since = "9.0.0", forRemoval = true)
59 public class ShardTransactionTest extends AbstractActorTest {
60
61     private static final TransactionType RO = TransactionType.READ_ONLY;
62     private static final TransactionType RW = TransactionType.READ_WRITE;
63     private static final TransactionType WO = TransactionType.WRITE_ONLY;
64
65     private static final ShardIdentifier SHARD_IDENTIFIER =
66         ShardIdentifier.create("inventory", MEMBER_NAME, "config");
67     private static final EffectiveModelContext TEST_MODEL = TestModel.createTestContext();
68
69     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
70
71     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
72
73     private TestActorRef<Shard> shard;
74     private ShardDataTree store;
75     private TestKit testKit;
76
77     @Before
78     public void setUp() {
79         shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
80                 .schemaContextProvider(() -> TEST_MODEL).props()
81                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
82         ShardTestKit.waitUntilLeader(shard);
83         store = shard.underlyingActor().getDataStore();
84         testKit = new TestKit(getSystem());
85     }
86
87     private ActorRef newTransactionActor(final TransactionType type,
88             final AbstractShardDataTreeTransaction<?> transaction, final String name) {
89         Props props = ShardTransaction.props(type, transaction, shard, datastoreContext,
90                 shard.underlyingActor().getShardMBean());
91         return actorFactory.createActorNoVerify(props, name);
92     }
93
94     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
95         return store.newReadOnlyTransaction(nextTransactionId());
96     }
97
98     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
99         return store.newReadWriteTransaction(nextTransactionId());
100     }
101
102     @Test
103     public void testOnReceiveReadData() {
104         testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
105         testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
106     }
107
108     private void testOnReceiveReadData(final ActorRef transaction) {
109         transaction.tell(new ReadData(YangInstanceIdentifier.of(), DataStoreVersions.CURRENT_VERSION),
110             testKit.getRef());
111
112         ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
113
114         assertNotNull(reply.getNormalizedNode());
115     }
116
117     @Test
118     public void testOnReceiveReadDataWhenDataNotFound() {
119         testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RO, readOnlyTransaction(),
120             "testReadDataWhenDataNotFoundRO"));
121         testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RW, readWriteTransaction(),
122             "testReadDataWhenDataNotFoundRW"));
123     }
124
125     private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
126         transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
127
128         ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
129
130         assertNull(reply.getNormalizedNode());
131     }
132
133     @Test
134     public void testOnReceiveDataExistsPositive() {
135         testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
136         testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
137     }
138
139     private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
140         transaction.tell(new DataExists(YangInstanceIdentifier.of(), DataStoreVersions.CURRENT_VERSION),
141             testKit.getRef());
142
143         DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
144
145         assertTrue(reply.exists());
146     }
147
148     @Test
149     public void testOnReceiveDataExistsNegative() {
150         testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
151         testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
152     }
153
154     private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
155         transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
156
157         DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
158
159         assertFalse(reply.exists());
160     }
161
162     @Test
163     public void testOnReceiveBatchedModifications() {
164         ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
165         DataTreeModification mockModification = mock(DataTreeModification.class);
166         ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
167             nextTransactionId(), mockModification);
168         final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
169
170         YangInstanceIdentifier writePath = TestModel.TEST_PATH;
171         NormalizedNode writeData = Builders.containerBuilder()
172             .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
173             .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
174             .build();
175
176         YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
177         NormalizedNode mergeData = Builders.containerBuilder()
178             .withNodeIdentifier(new NodeIdentifier(TestModel.OUTER_LIST_QNAME))
179             .build();
180
181         YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
182
183         BatchedModifications batched = new BatchedModifications(nextTransactionId(),
184             DataStoreVersions.CURRENT_VERSION);
185         batched.addModification(new WriteModification(writePath, writeData));
186         batched.addModification(new MergeModification(mergePath, mergeData));
187         batched.addModification(new DeleteModification(deletePath));
188
189         transaction.tell(batched, testKit.getRef());
190
191         BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
192             BatchedModificationsReply.class);
193         assertEquals("getNumBatched", 3, reply.getNumBatched());
194
195         InOrder inOrder = inOrder(mockModification);
196         inOrder.verify(mockModification).write(writePath, writeData);
197         inOrder.verify(mockModification).merge(mergePath, mergeData);
198         inOrder.verify(mockModification).delete(deletePath);
199     }
200
201     @Test
202     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() {
203         final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
204                 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
205
206         TestKit watcher = new TestKit(getSystem());
207         watcher.watch(transaction);
208
209         YangInstanceIdentifier writePath = TestModel.TEST_PATH;
210         NormalizedNode writeData = Builders.containerBuilder()
211             .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
212             .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
213             .build();
214
215         final TransactionIdentifier tx1 = nextTransactionId();
216         BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
217         batched.addModification(new WriteModification(writePath, writeData));
218
219         transaction.tell(batched, testKit.getRef());
220         BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
221             BatchedModificationsReply.class);
222         assertEquals("getNumBatched", 1, reply.getNumBatched());
223
224         batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
225         batched.setReady();
226         batched.setTotalMessagesSent(2);
227
228         transaction.tell(batched, testKit.getRef());
229         testKit.expectMsgClass(Duration.ofSeconds(5), ReadyTransactionReply.class);
230         watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
231     }
232
233     @Test
234     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() {
235         final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
236                 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
237
238         TestKit watcher = new TestKit(getSystem());
239         watcher.watch(transaction);
240
241         YangInstanceIdentifier writePath = TestModel.TEST_PATH;
242         NormalizedNode writeData = Builders.containerBuilder()
243             .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
244             .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
245             .build();
246
247         BatchedModifications batched = new BatchedModifications(nextTransactionId(),
248             DataStoreVersions.CURRENT_VERSION);
249         batched.addModification(new WriteModification(writePath, writeData));
250         batched.setReady();
251         batched.setDoCommitOnReady(true);
252         batched.setTotalMessagesSent(1);
253
254         transaction.tell(batched, testKit.getRef());
255         testKit.expectMsgClass(Duration.ofSeconds(5), CommitTransactionReply.class);
256         watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
257     }
258
259     @Test(expected = TestException.class)
260     public void testOnReceiveBatchedModificationsFailure() throws Exception {
261         ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
262         DataTreeModification mockModification = mock(DataTreeModification.class);
263         ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
264             nextTransactionId(), mockModification);
265         final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
266             "testOnReceiveBatchedModificationsFailure");
267
268         TestKit watcher = new TestKit(getSystem());
269         watcher.watch(transaction);
270
271         YangInstanceIdentifier path = TestModel.TEST_PATH;
272         ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
273
274         doThrow(new TestException()).when(mockModification).write(path, node);
275
276         final TransactionIdentifier tx1 = nextTransactionId();
277         BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
278         batched.addModification(new WriteModification(path, node));
279
280         transaction.tell(batched, testKit.getRef());
281         testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
282
283         batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
284         batched.setReady();
285         batched.setTotalMessagesSent(2);
286
287         transaction.tell(batched, testKit.getRef());
288         Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
289         watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
290
291         if (failure != null) {
292             Throwables.propagateIfPossible(failure.cause(), Exception.class);
293             throw new RuntimeException(failure.cause());
294         }
295     }
296
297     @Test(expected = IllegalStateException.class)
298     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
299         final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
300                 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
301
302         TestKit watcher = new TestKit(getSystem());
303         watcher.watch(transaction);
304
305         BatchedModifications batched = new BatchedModifications(nextTransactionId(),
306             DataStoreVersions.CURRENT_VERSION);
307         batched.setReady();
308         batched.setTotalMessagesSent(2);
309
310         transaction.tell(batched, testKit.getRef());
311
312         Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
313         watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
314
315         if (failure != null) {
316             Throwables.throwIfInstanceOf(failure.cause(), Exception.class);
317             Throwables.throwIfUnchecked(failure.cause());
318             throw new RuntimeException(failure.cause());
319         }
320     }
321
322     @Test
323     public void testReadWriteTxOnReceiveCloseTransaction() {
324         final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
325                 "testReadWriteTxOnReceiveCloseTransaction");
326
327         testKit.watch(transaction);
328
329         transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
330
331         testKit.expectMsgClass(Duration.ofSeconds(3), CloseTransactionReply.class);
332         testKit.expectTerminated(Duration.ofSeconds(3), transaction);
333     }
334
335     @Test
336     public void testWriteOnlyTxOnReceiveCloseTransaction() {
337         final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
338                 "testWriteTxOnReceiveCloseTransaction");
339
340         testKit.watch(transaction);
341
342         transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
343
344         testKit.expectMsgClass(Duration.ofSeconds(3), CloseTransactionReply.class);
345         testKit.expectTerminated(Duration.ofSeconds(3), transaction);
346     }
347
348     @Test
349     public void testReadOnlyTxOnReceiveCloseTransaction() {
350         final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
351                 "testReadOnlyTxOnReceiveCloseTransaction");
352
353         testKit.watch(transaction);
354
355         transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
356
357         testKit.expectMsgClass(Duration.ofSeconds(3), Terminated.class);
358     }
359
360     @Test
361     public void testShardTransactionInactivity() {
362         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
363                 500, TimeUnit.MILLISECONDS).build();
364
365         final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
366             "testShardTransactionInactivity");
367
368         testKit.watch(transaction);
369
370         testKit.expectMsgClass(Duration.ofSeconds(3), Terminated.class);
371     }
372
373     public static class TestException extends RuntimeException {
374         private static final long serialVersionUID = 1L;
375     }
376 }