0cb9046a65289fee6bcbb850acc47ab5f7b63fff
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Ignore;
24 import org.junit.Test;
25 import org.mockito.InOrder;
26 import org.mockito.Mockito;
27 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
28 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
29 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
30 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
31 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
32 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
37 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
38 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
42 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
43 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
44 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
45 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
46 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
47 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
52 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
53 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
54 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
55 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56
57 public class ShardTransactionTest extends AbstractActorTest {
58
59     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
60     private static final TransactionType RO = TransactionType.READ_ONLY;
61     private static final TransactionType RW = TransactionType.READ_WRITE;
62     private static final TransactionType WO = TransactionType.WRITE_ONLY;
63
64     private static final ShardIdentifier SHARD_IDENTIFIER =
65         ShardIdentifier.create("inventory", MEMBER_NAME, "config");
66
67
68     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
69
70     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
71
72     private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
73
74     private ActorRef createShard() {
75         ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
76                 schemaContext(TestModel.createTestContext()).props());
77         ShardTestKit.waitUntilLeader(shard);
78         return shard;
79     }
80
81     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
82         return newTransactionActor(type, transaction, null, name);
83     }
84
85     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
86         Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
87                 datastoreContext, shardStats);
88         return getSystem().actorOf(props, name);
89     }
90
91     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
92         return store.newReadOnlyTransaction(nextTransactionId());
93     }
94
95     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
96         return store.newReadWriteTransaction(nextTransactionId());
97     }
98
99     @Test
100     public void testOnReceiveReadData() throws Exception {
101         new JavaTestKit(getSystem()) {{
102             final ActorRef shard = createShard();
103
104             testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
105
106             testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
107         }
108
109         private void testOnReceiveReadData(final ActorRef transaction) {
110             transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY,
111                     DataStoreVersions.CURRENT_VERSION), getRef());
112
113             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
114
115             assertNotNull(reply.getNormalizedNode());
116         }};
117     }
118
119     @Test
120     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
121         new JavaTestKit(getSystem()) {{
122             final ActorRef shard = createShard();
123
124             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
125                     RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
126
127             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
128                     RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
129         }
130
131         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
132             transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
133
134             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
135
136             assertTrue(reply.getNormalizedNode() == null);
137         }};
138     }
139
140     @Test
141     public void testOnReceiveDataExistsPositive() throws Exception {
142         new JavaTestKit(getSystem()) {{
143             final ActorRef shard = createShard();
144
145             testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
146                     "testDataExistsPositiveRO"));
147
148             testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
149                     "testDataExistsPositiveRW"));
150         }
151
152         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
153             transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY,
154                     DataStoreVersions.CURRENT_VERSION), getRef());
155
156             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
157
158             assertTrue(reply.exists());
159         }};
160     }
161
162     @Test
163     public void testOnReceiveDataExistsNegative() throws Exception {
164         new JavaTestKit(getSystem()) {{
165             final ActorRef shard = createShard();
166
167             testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
168                     "testDataExistsNegativeRO"));
169
170             testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
171                     "testDataExistsNegativeRW"));
172         }
173
174         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
175             transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
176
177             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
178
179             assertFalse(reply.exists());
180         }};
181     }
182
183     @Test
184     public void testOnReceiveBatchedModifications() throws Exception {
185         new JavaTestKit(getSystem()) {{
186
187             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
188             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
189             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
190                 nextTransactionId(), mockModification);
191             final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
192
193             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
194             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
195                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
196                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
197
198             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
199             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
200                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
201
202             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
203
204             BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
205             batched.addModification(new WriteModification(writePath, writeData));
206             batched.addModification(new MergeModification(mergePath, mergeData));
207             batched.addModification(new DeleteModification(deletePath));
208
209             transaction.tell(batched, getRef());
210
211             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
212             assertEquals("getNumBatched", 3, reply.getNumBatched());
213
214             InOrder inOrder = Mockito.inOrder(mockModification);
215             inOrder.verify(mockModification).write(writePath, writeData);
216             inOrder.verify(mockModification).merge(mergePath, mergeData);
217             inOrder.verify(mockModification).delete(deletePath);
218         }};
219     }
220
221     @Test
222     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
223         new JavaTestKit(getSystem()) {{
224
225             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
226                     "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
227
228             JavaTestKit watcher = new JavaTestKit(getSystem());
229             watcher.watch(transaction);
230
231             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
232             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
233                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
234                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
235
236             final TransactionIdentifier tx1 = nextTransactionId();
237             BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
238             batched.addModification(new WriteModification(writePath, writeData));
239
240             transaction.tell(batched, getRef());
241             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
242             assertEquals("getNumBatched", 1, reply.getNumBatched());
243
244             batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
245             batched.setReady(true);
246             batched.setTotalMessagesSent(2);
247
248             transaction.tell(batched, getRef());
249             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
250             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
251         }};
252     }
253
254     @Test
255     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
256         new JavaTestKit(getSystem()) {{
257
258             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
259                     "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
260
261             JavaTestKit watcher = new JavaTestKit(getSystem());
262             watcher.watch(transaction);
263
264             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
265             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
266                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
267                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
268
269             BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
270             batched.addModification(new WriteModification(writePath, writeData));
271             batched.setReady(true);
272             batched.setDoCommitOnReady(true);
273             batched.setTotalMessagesSent(1);
274
275             transaction.tell(batched, getRef());
276             expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
277             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
278         }};
279     }
280
281     @Test(expected=TestException.class)
282     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
283         new JavaTestKit(getSystem()) {{
284
285             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
286             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
287             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
288                 nextTransactionId(), mockModification);
289             final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
290                     "testOnReceiveBatchedModificationsFailure");
291
292             JavaTestKit watcher = new JavaTestKit(getSystem());
293             watcher.watch(transaction);
294
295             YangInstanceIdentifier path = TestModel.TEST_PATH;
296             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
297
298             doThrow(new TestException()).when(mockModification).write(path, node);
299
300             final TransactionIdentifier tx1 = nextTransactionId();
301             BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
302             batched.addModification(new WriteModification(path, node));
303
304             transaction.tell(batched, getRef());
305             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
306
307             batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
308             batched.setReady(true);
309             batched.setTotalMessagesSent(2);
310
311             transaction.tell(batched, getRef());
312             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
313             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
314
315             if(failure != null) {
316                 throw failure.cause();
317             }
318         }};
319     }
320
321     @Test(expected=IllegalStateException.class)
322     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
323         new JavaTestKit(getSystem()) {{
324
325             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
326                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
327
328             JavaTestKit watcher = new JavaTestKit(getSystem());
329             watcher.watch(transaction);
330
331             BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
332             batched.setReady(true);
333             batched.setTotalMessagesSent(2);
334
335             transaction.tell(batched, getRef());
336
337             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
338             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
339
340             if(failure != null) {
341                 throw failure.cause();
342             }
343         }};
344     }
345
346     @Test
347     public void testOnReceiveCreateSnapshot() throws Exception {
348         new JavaTestKit(getSystem()) {{
349             ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
350                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
351
352             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
353                     YangInstanceIdentifier.EMPTY);
354
355             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
356                     "testOnReceiveCreateSnapshot");
357
358             watch(transaction);
359
360             transaction.tell(CreateSnapshot.INSTANCE, getRef());
361
362             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
363
364             assertNotNull("getSnapshot is null", reply.getSnapshot());
365
366             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
367                     reply.getSnapshot());
368
369             assertEquals("Root node", expectedRoot, actualRoot);
370
371             expectTerminated(duration("3 seconds"), transaction);
372         }};
373     }
374
375     @Test
376     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
377         new JavaTestKit(getSystem()) {{
378             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
379                     "testReadWriteTxOnReceiveCloseTransaction");
380
381             watch(transaction);
382
383             transaction.tell(new CloseTransaction().toSerializable(), getRef());
384
385             expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
386             expectTerminated(duration("3 seconds"), transaction);
387         }};
388     }
389
390     @Test
391     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
392         new JavaTestKit(getSystem()) {{
393             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
394                     "testWriteTxOnReceiveCloseTransaction");
395
396             watch(transaction);
397
398             transaction.tell(new CloseTransaction().toSerializable(), getRef());
399
400             expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
401             expectTerminated(duration("3 seconds"), transaction);
402         }};
403     }
404
405     @Test
406     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
407         new JavaTestKit(getSystem()) {{
408             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
409                     "testReadOnlyTxOnReceiveCloseTransaction");
410
411             watch(transaction);
412
413             transaction.tell(new CloseTransaction().toSerializable(), getRef());
414
415             expectMsgClass(duration("3 seconds"), Terminated.class);
416         }};
417     }
418
419     // Unknown operations are being logged
420     @Ignore
421     @Test(expected=UnknownMessageException.class)
422     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
423         final ActorRef shard = createShard();
424         final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
425                 datastoreContext, shardStats);
426         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
427
428         transaction.receive(new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION),
429                 ActorRef.noSender());
430     }
431
432     @Test
433     public void testShardTransactionInactivity() {
434
435         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
436                 500, TimeUnit.MILLISECONDS).build();
437
438         new JavaTestKit(getSystem()) {{
439             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
440                     "testShardTransactionInactivity");
441
442             watch(transaction);
443
444             expectMsgClass(duration("3 seconds"), Terminated.class);
445         }};
446     }
447     public static class TestException extends RuntimeException {
448         private static final long serialVersionUID = 1L;
449     }
450 }