Deprecate ReadData/DataExists protobuff messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.mockito.InOrder;
25 import org.mockito.Mockito;
26 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
27 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
28 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
29 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
35 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
41 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
43 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
44 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
45 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
46 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
51 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
52 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
53 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55
56 public class ShardTransactionTest extends AbstractActorTest {
57
58     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
59     private static final TransactionType RO = TransactionType.READ_ONLY;
60     private static final TransactionType RW = TransactionType.READ_WRITE;
61     private static final TransactionType WO = TransactionType.WRITE_ONLY;
62
63     private static final ShardIdentifier SHARD_IDENTIFIER =
64         ShardIdentifier.builder().memberName("member-1")
65             .shardName("inventory").type("config").build();
66
67     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
68
69     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
70
71     private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
72
73     private int txCounter = 0;
74
75     private ActorRef createShard() {
76         ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
77                 schemaContext(TestModel.createTestContext()).props());
78         ShardTestKit.waitUntilLeader(shard);
79         return shard;
80     }
81
82     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
83         return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
84     }
85
86     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
87         return newTransactionActor(type, transaction, null, name, version);
88     }
89
90     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
91         return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
92     }
93
94     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
95             short version) {
96         Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
97                 datastoreContext, shardStats, "txn", version);
98         return getSystem().actorOf(props, name);
99     }
100
101     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
102         return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
103     }
104
105     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
106         return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
107     }
108
109     @Test
110     public void testOnReceiveReadData() throws Exception {
111         new JavaTestKit(getSystem()) {{
112             final ActorRef shard = createShard();
113
114             testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
115
116             testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
117         }
118
119         private void testOnReceiveReadData(final ActorRef transaction) {
120             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build(),
121                     DataStoreVersions.CURRENT_VERSION),getRef());
122
123             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
124
125             assertNotNull(reply.getNormalizedNode());
126         }};
127     }
128
129     @Test
130     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
131         new JavaTestKit(getSystem()) {{
132             final ActorRef shard = createShard();
133
134             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
135                     RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
136
137             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
138                     RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
139         }
140
141         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
142             transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
143
144             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
145
146             assertTrue(reply.getNormalizedNode() == null);
147         }};
148     }
149
150     @Test
151     public void testOnReceiveDataExistsPositive() throws Exception {
152         new JavaTestKit(getSystem()) {{
153             final ActorRef shard = createShard();
154
155             testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
156                     "testDataExistsPositiveRO"));
157
158             testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
159                     "testDataExistsPositiveRW"));
160         }
161
162         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
163             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build(),
164                     DataStoreVersions.CURRENT_VERSION),getRef());
165
166             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
167
168             assertTrue(reply.exists());
169         }};
170     }
171
172     @Test
173     public void testOnReceiveDataExistsNegative() throws Exception {
174         new JavaTestKit(getSystem()) {{
175             final ActorRef shard = createShard();
176
177             testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
178                     "testDataExistsNegativeRO"));
179
180             testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
181                     "testDataExistsNegativeRW"));
182         }
183
184         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
185             transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
186
187             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
188
189             assertFalse(reply.exists());
190         }};
191     }
192
193     @Test
194     public void testOnReceiveBatchedModifications() throws Exception {
195         new JavaTestKit(getSystem()) {{
196
197             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
198             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
199             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
200             final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
201
202             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
203             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
204                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
205                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
206
207             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
208             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
209                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
210
211             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
212
213             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
214             batched.addModification(new WriteModification(writePath, writeData));
215             batched.addModification(new MergeModification(mergePath, mergeData));
216             batched.addModification(new DeleteModification(deletePath));
217
218             transaction.tell(batched, getRef());
219
220             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
221             assertEquals("getNumBatched", 3, reply.getNumBatched());
222
223             InOrder inOrder = Mockito.inOrder(mockModification);
224             inOrder.verify(mockModification).write(writePath, writeData);
225             inOrder.verify(mockModification).merge(mergePath, mergeData);
226             inOrder.verify(mockModification).delete(deletePath);
227         }};
228     }
229
230     @Test
231     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
232         new JavaTestKit(getSystem()) {{
233
234             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
235                     "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
236
237             JavaTestKit watcher = new JavaTestKit(getSystem());
238             watcher.watch(transaction);
239
240             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
241             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
242                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
243                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
244
245             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
246             batched.addModification(new WriteModification(writePath, writeData));
247
248             transaction.tell(batched, getRef());
249             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
250             assertEquals("getNumBatched", 1, reply.getNumBatched());
251
252             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
253             batched.setReady(true);
254             batched.setTotalMessagesSent(2);
255
256             transaction.tell(batched, getRef());
257             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
258             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
259         }};
260     }
261
262     @Test
263     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
264         new JavaTestKit(getSystem()) {{
265
266             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
267                     "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
268
269             JavaTestKit watcher = new JavaTestKit(getSystem());
270             watcher.watch(transaction);
271
272             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
273             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
274                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
275                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
276
277             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
278             batched.addModification(new WriteModification(writePath, writeData));
279             batched.setReady(true);
280             batched.setDoCommitOnReady(true);
281             batched.setTotalMessagesSent(1);
282
283             transaction.tell(batched, getRef());
284             expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
285             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
286         }};
287     }
288
289     @Test(expected=TestException.class)
290     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
291         new JavaTestKit(getSystem()) {{
292
293             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
294             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
295             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
296             final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
297                     "testOnReceiveBatchedModificationsFailure");
298
299             JavaTestKit watcher = new JavaTestKit(getSystem());
300             watcher.watch(transaction);
301
302             YangInstanceIdentifier path = TestModel.TEST_PATH;
303             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
304
305             doThrow(new TestException()).when(mockModification).write(path, node);
306
307             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
308             batched.addModification(new WriteModification(path, node));
309
310             transaction.tell(batched, getRef());
311             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
312
313             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
314             batched.setReady(true);
315             batched.setTotalMessagesSent(2);
316
317             transaction.tell(batched, getRef());
318             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
319             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
320
321             if(failure != null) {
322                 throw failure.cause();
323             }
324         }};
325     }
326
327     @Test(expected=IllegalStateException.class)
328     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
329         new JavaTestKit(getSystem()) {{
330
331             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
332                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
333
334             JavaTestKit watcher = new JavaTestKit(getSystem());
335             watcher.watch(transaction);
336
337             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
338             batched.setReady(true);
339             batched.setTotalMessagesSent(2);
340
341             transaction.tell(batched, getRef());
342
343             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
344             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
345
346             if(failure != null) {
347                 throw failure.cause();
348             }
349         }};
350     }
351
352     @Test
353     public void testOnReceiveCreateSnapshot() throws Exception {
354         new JavaTestKit(getSystem()) {{
355             ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
356                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
357
358             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
359                     YangInstanceIdentifier.builder().build());
360
361             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
362                     "testOnReceiveCreateSnapshot");
363
364             watch(transaction);
365
366             transaction.tell(CreateSnapshot.INSTANCE, getRef());
367
368             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
369
370             assertNotNull("getSnapshot is null", reply.getSnapshot());
371
372             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
373                     reply.getSnapshot());
374
375             assertEquals("Root node", expectedRoot, actualRoot);
376
377             expectTerminated(duration("3 seconds"), transaction);
378         }};
379     }
380
381     @Test
382     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
383         new JavaTestKit(getSystem()) {{
384             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
385                     "testReadWriteTxOnReceiveCloseTransaction");
386
387             watch(transaction);
388
389             transaction.tell(new CloseTransaction().toSerializable(), getRef());
390
391             expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
392             expectTerminated(duration("3 seconds"), transaction);
393         }};
394     }
395
396     @Test
397     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
398         new JavaTestKit(getSystem()) {{
399             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
400                     "testWriteTxOnReceiveCloseTransaction");
401
402             watch(transaction);
403
404             transaction.tell(new CloseTransaction().toSerializable(), getRef());
405
406             expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
407             expectTerminated(duration("3 seconds"), transaction);
408         }};
409     }
410
411     @Test
412     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
413         new JavaTestKit(getSystem()) {{
414             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
415                     "testReadOnlyTxOnReceiveCloseTransaction");
416
417             watch(transaction);
418
419             transaction.tell(new CloseTransaction().toSerializable(), getRef());
420
421             expectMsgClass(duration("3 seconds"), Terminated.class);
422         }};
423     }
424
425     @Test(expected=UnknownMessageException.class)
426     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
427         final ActorRef shard = createShard();
428         final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
429                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
430         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
431
432         transaction.receive(new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null),
433                 ActorRef.noSender());
434     }
435
436     @Test
437     public void testShardTransactionInactivity() {
438
439         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
440                 500, TimeUnit.MILLISECONDS).build();
441
442         new JavaTestKit(getSystem()) {{
443             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
444                     "testShardTransactionInactivity");
445
446             watch(transaction);
447
448             expectMsgClass(duration("3 seconds"), Terminated.class);
449         }};
450     }
451
452     @Test
453     public void testOnReceivePreBoronReadData() throws Exception {
454         new JavaTestKit(getSystem()) {{
455             ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), createShard(),
456                     "testOnReceivePreBoronReadData");
457
458             transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.LITHIUM_VERSION).
459                     toSerializable(), getRef());
460
461             Object replySerialized = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
462             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
463         }};
464     }
465
466     @Test
467     public void testOnReceivePreBoronDataExists() throws Exception {
468         new JavaTestKit(getSystem()) {{
469             ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), createShard(),
470                     "testOnReceivePreBoronDataExists");
471
472             transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.LITHIUM_VERSION).
473                     toSerializable(), getRef());
474
475             Object replySerialized = expectMsgClass(duration("5 seconds"),
476                     ShardTransactionMessages.DataExistsReply.class);
477             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
478         }};
479     }
480
481     public static class TestException extends RuntimeException {
482         private static final long serialVersionUID = 1L;
483     }
484 }