51e46c000cd316ab54f2305d445f313835448434
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.mockito.InOrder;
25 import org.mockito.Mockito;
26 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
27 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
28 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
29 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
35 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
41 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
43 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
44 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
45 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
46 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
51 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
52 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
53 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55
56 public class ShardTransactionTest extends AbstractActorTest {
57
58     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
59     private static final TransactionType RO = TransactionType.READ_ONLY;
60     private static final TransactionType RW = TransactionType.READ_WRITE;
61     private static final TransactionType WO = TransactionType.WRITE_ONLY;
62
63     private static final ShardIdentifier SHARD_IDENTIFIER =
64         ShardIdentifier.builder().memberName("member-1")
65             .shardName("inventory").type("config").build();
66
67     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
68
69     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
70
71     private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
72
73     private int txCounter = 0;
74
75     private ActorRef createShard() {
76         ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
77                 schemaContext(TestModel.createTestContext()).props());
78         ShardTestKit.waitUntilLeader(shard);
79         return shard;
80     }
81
82     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
83         return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
84     }
85
86     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
87         return newTransactionActor(type, transaction, null, name, version);
88     }
89
90     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
91         return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
92     }
93
94     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
95             short version) {
96         Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
97                 datastoreContext, shardStats, "txn", version);
98         return getSystem().actorOf(props, name);
99     }
100
101     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
102         return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
103     }
104
105     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
106         return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
107     }
108
109     @Test
110     public void testOnReceiveReadData() throws Exception {
111         new JavaTestKit(getSystem()) {{
112             final ActorRef shard = createShard();
113
114             testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
115
116             testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
117         }
118
119         private void testOnReceiveReadData(final ActorRef transaction) {
120             //serialized read
121             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
122                 getRef());
123
124             Object replySerialized =
125                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
126
127             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
128
129             // unserialized read
130             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
131
132             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
133
134             assertNotNull(reply.getNormalizedNode());
135         }};
136     }
137
138     @Test
139     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
140         new JavaTestKit(getSystem()) {{
141             final ActorRef shard = createShard();
142
143             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
144                     RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
145
146             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
147                     RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
148         }
149
150         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
151             // serialized read
152             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
153
154             Object replySerialized =
155                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
156
157             assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
158
159             // unserialized read
160             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
161
162             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
163
164             assertTrue(reply.getNormalizedNode() == null);
165         }};
166     }
167
168     @Test
169     public void testOnReceiveDataExistsPositive() throws Exception {
170         new JavaTestKit(getSystem()) {{
171             final ActorRef shard = createShard();
172
173             testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
174                     "testDataExistsPositiveRO"));
175
176             testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
177                     "testDataExistsPositiveRW"));
178         }
179
180         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
181             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
182                 getRef());
183
184             ShardTransactionMessages.DataExistsReply replySerialized =
185                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
186
187             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
188
189             // unserialized read
190             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
191
192             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
193
194             assertTrue(reply.exists());
195         }};
196     }
197
198     @Test
199     public void testOnReceiveDataExistsNegative() throws Exception {
200         new JavaTestKit(getSystem()) {{
201             final ActorRef shard = createShard();
202
203             testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
204                     "testDataExistsNegativeRO"));
205
206             testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
207                     "testDataExistsNegativeRW"));
208         }
209
210         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
211             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
212
213             ShardTransactionMessages.DataExistsReply replySerialized =
214                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
215
216             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
217
218             // unserialized read
219             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
220
221             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
222
223             assertFalse(reply.exists());
224         }};
225     }
226
227     @Test
228     public void testOnReceiveBatchedModifications() throws Exception {
229         new JavaTestKit(getSystem()) {{
230
231             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
232             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
233             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
234             final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
235
236             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
237             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
238                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
239                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
240
241             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
242             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
243                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
244
245             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
246
247             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
248             batched.addModification(new WriteModification(writePath, writeData));
249             batched.addModification(new MergeModification(mergePath, mergeData));
250             batched.addModification(new DeleteModification(deletePath));
251
252             transaction.tell(batched, getRef());
253
254             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
255             assertEquals("getNumBatched", 3, reply.getNumBatched());
256
257             InOrder inOrder = Mockito.inOrder(mockModification);
258             inOrder.verify(mockModification).write(writePath, writeData);
259             inOrder.verify(mockModification).merge(mergePath, mergeData);
260             inOrder.verify(mockModification).delete(deletePath);
261         }};
262     }
263
264     @Test
265     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
266         new JavaTestKit(getSystem()) {{
267
268             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
269                     "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
270
271             JavaTestKit watcher = new JavaTestKit(getSystem());
272             watcher.watch(transaction);
273
274             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
275             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
276                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
277                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
278
279             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
280             batched.addModification(new WriteModification(writePath, writeData));
281
282             transaction.tell(batched, getRef());
283             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
284             assertEquals("getNumBatched", 1, reply.getNumBatched());
285
286             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
287             batched.setReady(true);
288             batched.setTotalMessagesSent(2);
289
290             transaction.tell(batched, getRef());
291             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
292             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
293         }};
294     }
295
296     @Test
297     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
298         new JavaTestKit(getSystem()) {{
299
300             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
301                     "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
302
303             JavaTestKit watcher = new JavaTestKit(getSystem());
304             watcher.watch(transaction);
305
306             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
307             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
308                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
309                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
310
311             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
312             batched.addModification(new WriteModification(writePath, writeData));
313             batched.setReady(true);
314             batched.setDoCommitOnReady(true);
315             batched.setTotalMessagesSent(1);
316
317             transaction.tell(batched, getRef());
318             expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
319             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
320         }};
321     }
322
323     @Test(expected=TestException.class)
324     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
325         new JavaTestKit(getSystem()) {{
326
327             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
328             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
329             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
330             final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
331                     "testOnReceiveBatchedModificationsFailure");
332
333             JavaTestKit watcher = new JavaTestKit(getSystem());
334             watcher.watch(transaction);
335
336             YangInstanceIdentifier path = TestModel.TEST_PATH;
337             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
338
339             doThrow(new TestException()).when(mockModification).write(path, node);
340
341             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
342             batched.addModification(new WriteModification(path, node));
343
344             transaction.tell(batched, getRef());
345             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
346
347             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
348             batched.setReady(true);
349             batched.setTotalMessagesSent(2);
350
351             transaction.tell(batched, getRef());
352             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
353             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
354
355             if(failure != null) {
356                 throw failure.cause();
357             }
358         }};
359     }
360
361     @Test(expected=IllegalStateException.class)
362     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
363         new JavaTestKit(getSystem()) {{
364
365             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
366                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
367
368             JavaTestKit watcher = new JavaTestKit(getSystem());
369             watcher.watch(transaction);
370
371             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
372             batched.setReady(true);
373             batched.setTotalMessagesSent(2);
374
375             transaction.tell(batched, getRef());
376
377             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
378             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
379
380             if(failure != null) {
381                 throw failure.cause();
382             }
383         }};
384     }
385
386     @Test
387     public void testOnReceiveCreateSnapshot() throws Exception {
388         new JavaTestKit(getSystem()) {{
389             ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
390                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
391
392             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
393                     YangInstanceIdentifier.builder().build());
394
395             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
396                     "testOnReceiveCreateSnapshot");
397
398             watch(transaction);
399
400             transaction.tell(CreateSnapshot.INSTANCE, getRef());
401
402             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
403
404             assertNotNull("getSnapshot is null", reply.getSnapshot());
405
406             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
407                     reply.getSnapshot());
408
409             assertEquals("Root node", expectedRoot, actualRoot);
410
411             expectTerminated(duration("3 seconds"), transaction);
412         }};
413     }
414
415     @Test
416     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
417         new JavaTestKit(getSystem()) {{
418             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
419                     "testReadWriteTxOnReceiveCloseTransaction");
420
421             watch(transaction);
422
423             transaction.tell(new CloseTransaction().toSerializable(), getRef());
424
425             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
426             expectTerminated(duration("3 seconds"), transaction);
427         }};
428     }
429
430     @Test
431     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
432         new JavaTestKit(getSystem()) {{
433             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
434                     "testWriteTxOnReceiveCloseTransaction");
435
436             watch(transaction);
437
438             transaction.tell(new CloseTransaction().toSerializable(), getRef());
439
440             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
441             expectTerminated(duration("3 seconds"), transaction);
442         }};
443     }
444
445     @Test
446     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
447         new JavaTestKit(getSystem()) {{
448             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
449                     "testReadOnlyTxOnReceiveCloseTransaction");
450
451             watch(transaction);
452
453             transaction.tell(new CloseTransaction().toSerializable(), getRef());
454
455             expectMsgClass(duration("3 seconds"), Terminated.class);
456         }};
457     }
458
459     @Test(expected=UnknownMessageException.class)
460     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
461         final ActorRef shard = createShard();
462         final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
463                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
464         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
465
466         transaction.receive(new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null),
467                 ActorRef.noSender());
468     }
469
470     @Test
471     public void testShardTransactionInactivity() {
472
473         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
474                 500, TimeUnit.MILLISECONDS).build();
475
476         new JavaTestKit(getSystem()) {{
477             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
478                     "testShardTransactionInactivity");
479
480             watch(transaction);
481
482             expectMsgClass(duration("3 seconds"), Terminated.class);
483         }};
484     }
485
486     public static class TestException extends RuntimeException {
487         private static final long serialVersionUID = 1L;
488     }
489 }