5333321a426842c428370f42e045ab16f0909234
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.mockito.InOrder;
25 import org.mockito.Mockito;
26 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
27 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
28 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
29 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
35 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
41 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
43 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
44 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
45 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
46 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
51 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
52 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
53 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55
56 public class ShardTransactionTest extends AbstractActorTest {
57
58     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
59     private static final TransactionType RO = TransactionType.READ_ONLY;
60     private static final TransactionType RW = TransactionType.READ_WRITE;
61     private static final TransactionType WO = TransactionType.WRITE_ONLY;
62
63     private static final ShardIdentifier SHARD_IDENTIFIER =
64         ShardIdentifier.builder().memberName("member-1")
65             .shardName("inventory").type("config").build();
66
67     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
68
69     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
70
71     private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
72
73     private int txCounter = 0;
74
75     private ActorRef createShard() {
76         ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
77                 schemaContext(TestModel.createTestContext()).props());
78         ShardTestKit.waitUntilLeader(shard);
79         return shard;
80     }
81
82     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
83         return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
84     }
85
86     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
87         return newTransactionActor(type, transaction, null, name, version);
88     }
89
90     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
91         return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
92     }
93
94     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
95             short version) {
96         Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
97                 datastoreContext, shardStats, "txn", version);
98         return getSystem().actorOf(props, name);
99     }
100
101     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
102         return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
103     }
104
105     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
106         return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
107     }
108
109     @Test
110     public void testOnReceiveReadData() throws Exception {
111         new JavaTestKit(getSystem()) {{
112             final ActorRef shard = createShard();
113
114             testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
115
116             testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
117         }
118
119         private void testOnReceiveReadData(final ActorRef transaction) {
120             //serialized read
121             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
122                 getRef());
123
124             Object replySerialized = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
125
126             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
127
128             // unserialized read
129             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
130
131             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
132
133             assertNotNull(reply.getNormalizedNode());
134         }};
135     }
136
137     @Test
138     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
139         new JavaTestKit(getSystem()) {{
140             final ActorRef shard = createShard();
141
142             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
143                     RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
144
145             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
146                     RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
147         }
148
149         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
150             // serialized read
151             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
152
153             Object replySerialized = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
154
155             assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
156
157             // unserialized read
158             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
159
160             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
161
162             assertTrue(reply.getNormalizedNode() == null);
163         }};
164     }
165
166     @Test
167     public void testOnReceiveDataExistsPositive() throws Exception {
168         new JavaTestKit(getSystem()) {{
169             final ActorRef shard = createShard();
170
171             testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
172                     "testDataExistsPositiveRO"));
173
174             testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
175                     "testDataExistsPositiveRW"));
176         }
177
178         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
179             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
180                 getRef());
181
182             Object replySerialized = expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
183             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
184
185             // unserialized read
186             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
187
188             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
189
190             assertTrue(reply.exists());
191         }};
192     }
193
194     @Test
195     public void testOnReceiveDataExistsNegative() throws Exception {
196         new JavaTestKit(getSystem()) {{
197             final ActorRef shard = createShard();
198
199             testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
200                     "testDataExistsNegativeRO"));
201
202             testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
203                     "testDataExistsNegativeRW"));
204         }
205
206         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
207             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
208
209             Object replySerialized = expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
210             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
211
212             // unserialized read
213             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
214
215             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
216
217             assertFalse(reply.exists());
218         }};
219     }
220
221     @Test
222     public void testOnReceiveBatchedModifications() throws Exception {
223         new JavaTestKit(getSystem()) {{
224
225             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
226             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
227             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
228             final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
229
230             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
231             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
232                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
233                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
234
235             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
236             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
237                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
238
239             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
240
241             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
242             batched.addModification(new WriteModification(writePath, writeData));
243             batched.addModification(new MergeModification(mergePath, mergeData));
244             batched.addModification(new DeleteModification(deletePath));
245
246             transaction.tell(batched, getRef());
247
248             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
249             assertEquals("getNumBatched", 3, reply.getNumBatched());
250
251             InOrder inOrder = Mockito.inOrder(mockModification);
252             inOrder.verify(mockModification).write(writePath, writeData);
253             inOrder.verify(mockModification).merge(mergePath, mergeData);
254             inOrder.verify(mockModification).delete(deletePath);
255         }};
256     }
257
258     @Test
259     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
260         new JavaTestKit(getSystem()) {{
261
262             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
263                     "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
264
265             JavaTestKit watcher = new JavaTestKit(getSystem());
266             watcher.watch(transaction);
267
268             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
269             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
270                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
271                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
272
273             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
274             batched.addModification(new WriteModification(writePath, writeData));
275
276             transaction.tell(batched, getRef());
277             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
278             assertEquals("getNumBatched", 1, reply.getNumBatched());
279
280             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
281             batched.setReady(true);
282             batched.setTotalMessagesSent(2);
283
284             transaction.tell(batched, getRef());
285             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
286             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
287         }};
288     }
289
290     @Test
291     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
292         new JavaTestKit(getSystem()) {{
293
294             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
295                     "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
296
297             JavaTestKit watcher = new JavaTestKit(getSystem());
298             watcher.watch(transaction);
299
300             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
301             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
302                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
303                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
304
305             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
306             batched.addModification(new WriteModification(writePath, writeData));
307             batched.setReady(true);
308             batched.setDoCommitOnReady(true);
309             batched.setTotalMessagesSent(1);
310
311             transaction.tell(batched, getRef());
312             expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
313             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
314         }};
315     }
316
317     @Test(expected=TestException.class)
318     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
319         new JavaTestKit(getSystem()) {{
320
321             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
322             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
323             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
324             final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
325                     "testOnReceiveBatchedModificationsFailure");
326
327             JavaTestKit watcher = new JavaTestKit(getSystem());
328             watcher.watch(transaction);
329
330             YangInstanceIdentifier path = TestModel.TEST_PATH;
331             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
332
333             doThrow(new TestException()).when(mockModification).write(path, node);
334
335             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
336             batched.addModification(new WriteModification(path, node));
337
338             transaction.tell(batched, getRef());
339             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
340
341             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
342             batched.setReady(true);
343             batched.setTotalMessagesSent(2);
344
345             transaction.tell(batched, getRef());
346             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
347             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
348
349             if(failure != null) {
350                 throw failure.cause();
351             }
352         }};
353     }
354
355     @Test(expected=IllegalStateException.class)
356     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
357         new JavaTestKit(getSystem()) {{
358
359             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
360                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
361
362             JavaTestKit watcher = new JavaTestKit(getSystem());
363             watcher.watch(transaction);
364
365             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
366             batched.setReady(true);
367             batched.setTotalMessagesSent(2);
368
369             transaction.tell(batched, getRef());
370
371             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
372             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
373
374             if(failure != null) {
375                 throw failure.cause();
376             }
377         }};
378     }
379
380     @Test
381     public void testOnReceiveCreateSnapshot() throws Exception {
382         new JavaTestKit(getSystem()) {{
383             ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
384                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
385
386             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
387                     YangInstanceIdentifier.builder().build());
388
389             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
390                     "testOnReceiveCreateSnapshot");
391
392             watch(transaction);
393
394             transaction.tell(CreateSnapshot.INSTANCE, getRef());
395
396             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
397
398             assertNotNull("getSnapshot is null", reply.getSnapshot());
399
400             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
401                     reply.getSnapshot());
402
403             assertEquals("Root node", expectedRoot, actualRoot);
404
405             expectTerminated(duration("3 seconds"), transaction);
406         }};
407     }
408
409     @Test
410     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
411         new JavaTestKit(getSystem()) {{
412             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
413                     "testReadWriteTxOnReceiveCloseTransaction");
414
415             watch(transaction);
416
417             transaction.tell(new CloseTransaction().toSerializable(), getRef());
418
419             expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
420             expectTerminated(duration("3 seconds"), transaction);
421         }};
422     }
423
424     @Test
425     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
426         new JavaTestKit(getSystem()) {{
427             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
428                     "testWriteTxOnReceiveCloseTransaction");
429
430             watch(transaction);
431
432             transaction.tell(new CloseTransaction().toSerializable(), getRef());
433
434             expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
435             expectTerminated(duration("3 seconds"), transaction);
436         }};
437     }
438
439     @Test
440     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
441         new JavaTestKit(getSystem()) {{
442             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
443                     "testReadOnlyTxOnReceiveCloseTransaction");
444
445             watch(transaction);
446
447             transaction.tell(new CloseTransaction().toSerializable(), getRef());
448
449             expectMsgClass(duration("3 seconds"), Terminated.class);
450         }};
451     }
452
453     @Test(expected=UnknownMessageException.class)
454     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
455         final ActorRef shard = createShard();
456         final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
457                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
458         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
459
460         transaction.receive(new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null),
461                 ActorRef.noSender());
462     }
463
464     @Test
465     public void testShardTransactionInactivity() {
466
467         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
468                 500, TimeUnit.MILLISECONDS).build();
469
470         new JavaTestKit(getSystem()) {{
471             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
472                     "testShardTransactionInactivity");
473
474             watch(transaction);
475
476             expectMsgClass(duration("3 seconds"), Terminated.class);
477         }};
478     }
479
480     public static class TestException extends RuntimeException {
481         private static final long serialVersionUID = 1L;
482     }
483 }