55060d155f2560436d7c7d75930ae24a6a5b7f75
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Ignore;
24 import org.junit.Test;
25 import org.mockito.InOrder;
26 import org.mockito.Mockito;
27 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
28 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
29 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
30 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
31 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
32 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
37 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
42 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
43 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
44 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
46 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
49 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
50 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
51 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
52 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
53
54 public class ShardTransactionTest extends AbstractActorTest {
55
56     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
57     private static final TransactionType RO = TransactionType.READ_ONLY;
58     private static final TransactionType RW = TransactionType.READ_WRITE;
59     private static final TransactionType WO = TransactionType.WRITE_ONLY;
60
61     private static final ShardIdentifier SHARD_IDENTIFIER =
62         ShardIdentifier.create("inventory", MEMBER_NAME, "config");
63
64
65     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
66
67     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
68
69     private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
70
71     private ActorRef createShard() {
72         ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
73                 schemaContext(TestModel.createTestContext()).props());
74         ShardTestKit.waitUntilLeader(shard);
75         return shard;
76     }
77
78     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
79         return newTransactionActor(type, transaction, null, name);
80     }
81
82     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
83         Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
84                 datastoreContext, shardStats);
85         return getSystem().actorOf(props, name);
86     }
87
88     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
89         return store.newReadOnlyTransaction(nextTransactionId());
90     }
91
92     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
93         return store.newReadWriteTransaction(nextTransactionId());
94     }
95
96     @Test
97     public void testOnReceiveReadData() throws Exception {
98         new JavaTestKit(getSystem()) {{
99             final ActorRef shard = createShard();
100
101             testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
102
103             testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
104         }
105
106         private void testOnReceiveReadData(final ActorRef transaction) {
107             transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY,
108                     DataStoreVersions.CURRENT_VERSION), getRef());
109
110             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
111
112             assertNotNull(reply.getNormalizedNode());
113         }};
114     }
115
116     @Test
117     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
118         new JavaTestKit(getSystem()) {{
119             final ActorRef shard = createShard();
120
121             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
122                     RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
123
124             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
125                     RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
126         }
127
128         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
129             transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
130
131             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
132
133             assertTrue(reply.getNormalizedNode() == null);
134         }};
135     }
136
137     @Test
138     public void testOnReceiveDataExistsPositive() throws Exception {
139         new JavaTestKit(getSystem()) {{
140             final ActorRef shard = createShard();
141
142             testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
143                     "testDataExistsPositiveRO"));
144
145             testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
146                     "testDataExistsPositiveRW"));
147         }
148
149         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
150             transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY,
151                     DataStoreVersions.CURRENT_VERSION), getRef());
152
153             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
154
155             assertTrue(reply.exists());
156         }};
157     }
158
159     @Test
160     public void testOnReceiveDataExistsNegative() throws Exception {
161         new JavaTestKit(getSystem()) {{
162             final ActorRef shard = createShard();
163
164             testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
165                     "testDataExistsNegativeRO"));
166
167             testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
168                     "testDataExistsNegativeRW"));
169         }
170
171         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
172             transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
173
174             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
175
176             assertFalse(reply.exists());
177         }};
178     }
179
180     @Test
181     public void testOnReceiveBatchedModifications() throws Exception {
182         new JavaTestKit(getSystem()) {{
183
184             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
185             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
186             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
187                 nextTransactionId(), mockModification);
188             final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
189
190             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
191             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
192                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
193                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
194
195             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
196             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
197                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
198
199             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
200
201             BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
202             batched.addModification(new WriteModification(writePath, writeData));
203             batched.addModification(new MergeModification(mergePath, mergeData));
204             batched.addModification(new DeleteModification(deletePath));
205
206             transaction.tell(batched, getRef());
207
208             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
209             assertEquals("getNumBatched", 3, reply.getNumBatched());
210
211             InOrder inOrder = Mockito.inOrder(mockModification);
212             inOrder.verify(mockModification).write(writePath, writeData);
213             inOrder.verify(mockModification).merge(mergePath, mergeData);
214             inOrder.verify(mockModification).delete(deletePath);
215         }};
216     }
217
218     @Test
219     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
220         new JavaTestKit(getSystem()) {{
221
222             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
223                     "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
224
225             JavaTestKit watcher = new JavaTestKit(getSystem());
226             watcher.watch(transaction);
227
228             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
229             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
230                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
231                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
232
233             final TransactionIdentifier tx1 = nextTransactionId();
234             BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
235             batched.addModification(new WriteModification(writePath, writeData));
236
237             transaction.tell(batched, getRef());
238             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
239             assertEquals("getNumBatched", 1, reply.getNumBatched());
240
241             batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
242             batched.setReady(true);
243             batched.setTotalMessagesSent(2);
244
245             transaction.tell(batched, getRef());
246             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
247             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
248         }};
249     }
250
251     @Test
252     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
253         new JavaTestKit(getSystem()) {{
254
255             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
256                     "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
257
258             JavaTestKit watcher = new JavaTestKit(getSystem());
259             watcher.watch(transaction);
260
261             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
262             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
263                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
264                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
265
266             BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
267             batched.addModification(new WriteModification(writePath, writeData));
268             batched.setReady(true);
269             batched.setDoCommitOnReady(true);
270             batched.setTotalMessagesSent(1);
271
272             transaction.tell(batched, getRef());
273             expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
274             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
275         }};
276     }
277
278     @Test(expected=TestException.class)
279     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
280         new JavaTestKit(getSystem()) {{
281
282             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
283             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
284             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
285                 nextTransactionId(), mockModification);
286             final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
287                     "testOnReceiveBatchedModificationsFailure");
288
289             JavaTestKit watcher = new JavaTestKit(getSystem());
290             watcher.watch(transaction);
291
292             YangInstanceIdentifier path = TestModel.TEST_PATH;
293             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
294
295             doThrow(new TestException()).when(mockModification).write(path, node);
296
297             final TransactionIdentifier tx1 = nextTransactionId();
298             BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
299             batched.addModification(new WriteModification(path, node));
300
301             transaction.tell(batched, getRef());
302             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
303
304             batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
305             batched.setReady(true);
306             batched.setTotalMessagesSent(2);
307
308             transaction.tell(batched, getRef());
309             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
310             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
311
312             if(failure != null) {
313                 throw failure.cause();
314             }
315         }};
316     }
317
318     @Test(expected=IllegalStateException.class)
319     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
320         new JavaTestKit(getSystem()) {{
321
322             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
323                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
324
325             JavaTestKit watcher = new JavaTestKit(getSystem());
326             watcher.watch(transaction);
327
328             BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
329             batched.setReady(true);
330             batched.setTotalMessagesSent(2);
331
332             transaction.tell(batched, getRef());
333
334             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
335             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
336
337             if(failure != null) {
338                 throw failure.cause();
339             }
340         }};
341     }
342
343     @Test
344     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
345         new JavaTestKit(getSystem()) {{
346             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
347                     "testReadWriteTxOnReceiveCloseTransaction");
348
349             watch(transaction);
350
351             transaction.tell(new CloseTransaction().toSerializable(), getRef());
352
353             expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
354             expectTerminated(duration("3 seconds"), transaction);
355         }};
356     }
357
358     @Test
359     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
360         new JavaTestKit(getSystem()) {{
361             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
362                     "testWriteTxOnReceiveCloseTransaction");
363
364             watch(transaction);
365
366             transaction.tell(new CloseTransaction().toSerializable(), getRef());
367
368             expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
369             expectTerminated(duration("3 seconds"), transaction);
370         }};
371     }
372
373     @Test
374     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
375         new JavaTestKit(getSystem()) {{
376             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
377                     "testReadOnlyTxOnReceiveCloseTransaction");
378
379             watch(transaction);
380
381             transaction.tell(new CloseTransaction().toSerializable(), getRef());
382
383             expectMsgClass(duration("3 seconds"), Terminated.class);
384         }};
385     }
386
387     // Unknown operations are being logged
388     @Ignore
389     @Test(expected=UnknownMessageException.class)
390     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
391         final ActorRef shard = createShard();
392         final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
393                 datastoreContext, shardStats);
394         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
395
396         transaction.receive(new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION),
397                 ActorRef.noSender());
398     }
399
400     @Test
401     public void testShardTransactionInactivity() {
402
403         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
404                 500, TimeUnit.MILLISECONDS).build();
405
406         new JavaTestKit(getSystem()) {{
407             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
408                     "testShardTransactionInactivity");
409
410             watch(transaction);
411
412             expectMsgClass(duration("3 seconds"), Terminated.class);
413         }};
414     }
415     public static class TestException extends RuntimeException {
416         private static final long serialVersionUID = 1L;
417     }
418 }