1ea06efcf0cdb5dadf2a4112c733ebeea3f317fe
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.mockito.InOrder;
25 import org.mockito.Mockito;
26 import org.opendaylight.controller.cluster.access.concepts.MemberName;
27 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
28 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
29 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
31 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
37 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
42 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
43 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
44 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
45 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
46 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
47 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
52 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
53 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
54 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
55 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56
57 public class ShardTransactionTest extends AbstractActorTest {
58
59     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
60     private static final TransactionType RO = TransactionType.READ_ONLY;
61     private static final TransactionType RW = TransactionType.READ_WRITE;
62     private static final TransactionType WO = TransactionType.WRITE_ONLY;
63
64     private static final ShardIdentifier SHARD_IDENTIFIER =
65         ShardIdentifier.builder().memberName(MemberName.forName("member-1"))
66             .shardName("inventory").type("config").build();
67
68     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
69
70     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
71
72     private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
73
74     private int txCounter = 0;
75
76     private ActorRef createShard() {
77         ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
78                 schemaContext(TestModel.createTestContext()).props());
79         ShardTestKit.waitUntilLeader(shard);
80         return shard;
81     }
82
83     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
84         return newTransactionActor(type, transaction, null, name);
85     }
86
87     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
88         Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
89                 datastoreContext, shardStats, "txn");
90         return getSystem().actorOf(props, name);
91     }
92
93     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
94         return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
95     }
96
97     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
98         return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
99     }
100
101     @Test
102     public void testOnReceiveReadData() throws Exception {
103         new JavaTestKit(getSystem()) {{
104             final ActorRef shard = createShard();
105
106             testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
107
108             testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
109         }
110
111         private void testOnReceiveReadData(final ActorRef transaction) {
112             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build(),
113                     DataStoreVersions.CURRENT_VERSION),getRef());
114
115             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
116
117             assertNotNull(reply.getNormalizedNode());
118         }};
119     }
120
121     @Test
122     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
123         new JavaTestKit(getSystem()) {{
124             final ActorRef shard = createShard();
125
126             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
127                     RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
128
129             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
130                     RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
131         }
132
133         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
134             transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
135
136             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
137
138             assertTrue(reply.getNormalizedNode() == null);
139         }};
140     }
141
142     @Test
143     public void testOnReceiveDataExistsPositive() throws Exception {
144         new JavaTestKit(getSystem()) {{
145             final ActorRef shard = createShard();
146
147             testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
148                     "testDataExistsPositiveRO"));
149
150             testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
151                     "testDataExistsPositiveRW"));
152         }
153
154         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
155             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build(),
156                     DataStoreVersions.CURRENT_VERSION),getRef());
157
158             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
159
160             assertTrue(reply.exists());
161         }};
162     }
163
164     @Test
165     public void testOnReceiveDataExistsNegative() throws Exception {
166         new JavaTestKit(getSystem()) {{
167             final ActorRef shard = createShard();
168
169             testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
170                     "testDataExistsNegativeRO"));
171
172             testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
173                     "testDataExistsNegativeRW"));
174         }
175
176         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
177             transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
178
179             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
180
181             assertFalse(reply.exists());
182         }};
183     }
184
185     @Test
186     public void testOnReceiveBatchedModifications() throws Exception {
187         new JavaTestKit(getSystem()) {{
188
189             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
190             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
191             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
192             final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
193
194             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
195             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
196                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
197                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
198
199             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
200             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
201                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
202
203             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
204
205             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
206             batched.addModification(new WriteModification(writePath, writeData));
207             batched.addModification(new MergeModification(mergePath, mergeData));
208             batched.addModification(new DeleteModification(deletePath));
209
210             transaction.tell(batched, getRef());
211
212             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
213             assertEquals("getNumBatched", 3, reply.getNumBatched());
214
215             InOrder inOrder = Mockito.inOrder(mockModification);
216             inOrder.verify(mockModification).write(writePath, writeData);
217             inOrder.verify(mockModification).merge(mergePath, mergeData);
218             inOrder.verify(mockModification).delete(deletePath);
219         }};
220     }
221
222     @Test
223     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
224         new JavaTestKit(getSystem()) {{
225
226             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
227                     "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
228
229             JavaTestKit watcher = new JavaTestKit(getSystem());
230             watcher.watch(transaction);
231
232             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
233             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
234                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
235                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
236
237             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
238             batched.addModification(new WriteModification(writePath, writeData));
239
240             transaction.tell(batched, getRef());
241             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
242             assertEquals("getNumBatched", 1, reply.getNumBatched());
243
244             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
245             batched.setReady(true);
246             batched.setTotalMessagesSent(2);
247
248             transaction.tell(batched, getRef());
249             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
250             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
251         }};
252     }
253
254     @Test
255     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
256         new JavaTestKit(getSystem()) {{
257
258             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
259                     "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
260
261             JavaTestKit watcher = new JavaTestKit(getSystem());
262             watcher.watch(transaction);
263
264             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
265             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
266                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
267                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
268
269             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
270             batched.addModification(new WriteModification(writePath, writeData));
271             batched.setReady(true);
272             batched.setDoCommitOnReady(true);
273             batched.setTotalMessagesSent(1);
274
275             transaction.tell(batched, getRef());
276             expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
277             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
278         }};
279     }
280
281     @Test(expected=TestException.class)
282     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
283         new JavaTestKit(getSystem()) {{
284
285             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
286             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
287             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
288             final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
289                     "testOnReceiveBatchedModificationsFailure");
290
291             JavaTestKit watcher = new JavaTestKit(getSystem());
292             watcher.watch(transaction);
293
294             YangInstanceIdentifier path = TestModel.TEST_PATH;
295             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
296
297             doThrow(new TestException()).when(mockModification).write(path, node);
298
299             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
300             batched.addModification(new WriteModification(path, node));
301
302             transaction.tell(batched, getRef());
303             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
304
305             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
306             batched.setReady(true);
307             batched.setTotalMessagesSent(2);
308
309             transaction.tell(batched, getRef());
310             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
311             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
312
313             if(failure != null) {
314                 throw failure.cause();
315             }
316         }};
317     }
318
319     @Test(expected=IllegalStateException.class)
320     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
321         new JavaTestKit(getSystem()) {{
322
323             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
324                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
325
326             JavaTestKit watcher = new JavaTestKit(getSystem());
327             watcher.watch(transaction);
328
329             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
330             batched.setReady(true);
331             batched.setTotalMessagesSent(2);
332
333             transaction.tell(batched, getRef());
334
335             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
336             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
337
338             if(failure != null) {
339                 throw failure.cause();
340             }
341         }};
342     }
343
344     @Test
345     public void testOnReceiveCreateSnapshot() throws Exception {
346         new JavaTestKit(getSystem()) {{
347             ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
348                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
349
350             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
351                     YangInstanceIdentifier.builder().build());
352
353             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
354                     "testOnReceiveCreateSnapshot");
355
356             watch(transaction);
357
358             transaction.tell(CreateSnapshot.INSTANCE, getRef());
359
360             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
361
362             assertNotNull("getSnapshot is null", reply.getSnapshot());
363
364             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
365                     reply.getSnapshot());
366
367             assertEquals("Root node", expectedRoot, actualRoot);
368
369             expectTerminated(duration("3 seconds"), transaction);
370         }};
371     }
372
373     @Test
374     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
375         new JavaTestKit(getSystem()) {{
376             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
377                     "testReadWriteTxOnReceiveCloseTransaction");
378
379             watch(transaction);
380
381             transaction.tell(new CloseTransaction().toSerializable(), getRef());
382
383             expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
384             expectTerminated(duration("3 seconds"), transaction);
385         }};
386     }
387
388     @Test
389     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
390         new JavaTestKit(getSystem()) {{
391             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
392                     "testWriteTxOnReceiveCloseTransaction");
393
394             watch(transaction);
395
396             transaction.tell(new CloseTransaction().toSerializable(), getRef());
397
398             expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
399             expectTerminated(duration("3 seconds"), transaction);
400         }};
401     }
402
403     @Test
404     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
405         new JavaTestKit(getSystem()) {{
406             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
407                     "testReadOnlyTxOnReceiveCloseTransaction");
408
409             watch(transaction);
410
411             transaction.tell(new CloseTransaction().toSerializable(), getRef());
412
413             expectMsgClass(duration("3 seconds"), Terminated.class);
414         }};
415     }
416
417     @Test(expected=UnknownMessageException.class)
418     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
419         final ActorRef shard = createShard();
420         final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
421                 datastoreContext, shardStats, "txn");
422         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
423
424         transaction.receive(new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null),
425                 ActorRef.noSender());
426     }
427
428     @Test
429     public void testShardTransactionInactivity() {
430
431         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
432                 500, TimeUnit.MILLISECONDS).build();
433
434         new JavaTestKit(getSystem()) {{
435             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
436                     "testShardTransactionInactivity");
437
438             watch(transaction);
439
440             expectMsgClass(duration("3 seconds"), Terminated.class);
441         }};
442     }
443
444     @Test
445     public void testOnReceivePreBoronReadData() throws Exception {
446         new JavaTestKit(getSystem()) {{
447             ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), createShard(),
448                     "testOnReceivePreBoronReadData");
449
450             transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.LITHIUM_VERSION).
451                     toSerializable(), getRef());
452
453             Object replySerialized = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
454             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
455         }};
456     }
457
458     @Test
459     public void testOnReceivePreBoronDataExists() throws Exception {
460         new JavaTestKit(getSystem()) {{
461             ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), createShard(),
462                     "testOnReceivePreBoronDataExists");
463
464             transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.LITHIUM_VERSION).
465                     toSerializable(), getRef());
466
467             Object replySerialized = expectMsgClass(duration("5 seconds"),
468                     ShardTransactionMessages.DataExistsReply.class);
469             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
470         }};
471     }
472
473     public static class TestException extends RuntimeException {
474         private static final long serialVersionUID = 1L;
475     }
476 }