e0b34925ab6116708e7b1f257ca959b815d0079e
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.mockito.InOrder;
25 import org.mockito.Mockito;
26 import org.opendaylight.controller.cluster.access.concepts.MemberName;
27 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
28 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
29 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
31 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
37 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
42 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
43 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
44 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
45 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
46 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
47 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
52 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
53 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
54 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
55 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56
57 public class ShardTransactionTest extends AbstractActorTest {
58
59     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
60     private static final TransactionType RO = TransactionType.READ_ONLY;
61     private static final TransactionType RW = TransactionType.READ_WRITE;
62     private static final TransactionType WO = TransactionType.WRITE_ONLY;
63
64     private static final ShardIdentifier SHARD_IDENTIFIER =
65         ShardIdentifier.create("inventory", MemberName.forName("member-1"), "config");
66
67     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
68
69     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
70
71     private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
72
73     private int txCounter = 0;
74
75     private ActorRef createShard() {
76         ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
77                 schemaContext(TestModel.createTestContext()).props());
78         ShardTestKit.waitUntilLeader(shard);
79         return shard;
80     }
81
82     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
83         return newTransactionActor(type, transaction, null, name);
84     }
85
86     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
87         Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
88                 datastoreContext, shardStats, "txn");
89         return getSystem().actorOf(props, name);
90     }
91
92     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
93         return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
94     }
95
96     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
97         return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
98     }
99
100     @Test
101     public void testOnReceiveReadData() throws Exception {
102         new JavaTestKit(getSystem()) {{
103             final ActorRef shard = createShard();
104
105             testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
106
107             testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
108         }
109
110         private void testOnReceiveReadData(final ActorRef transaction) {
111             transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY,
112                     DataStoreVersions.CURRENT_VERSION), getRef());
113
114             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
115
116             assertNotNull(reply.getNormalizedNode());
117         }};
118     }
119
120     @Test
121     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
122         new JavaTestKit(getSystem()) {{
123             final ActorRef shard = createShard();
124
125             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
126                     RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
127
128             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
129                     RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
130         }
131
132         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
133             transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
134
135             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
136
137             assertTrue(reply.getNormalizedNode() == null);
138         }};
139     }
140
141     @Test
142     public void testOnReceiveDataExistsPositive() throws Exception {
143         new JavaTestKit(getSystem()) {{
144             final ActorRef shard = createShard();
145
146             testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
147                     "testDataExistsPositiveRO"));
148
149             testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
150                     "testDataExistsPositiveRW"));
151         }
152
153         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
154             transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY,
155                     DataStoreVersions.CURRENT_VERSION), getRef());
156
157             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
158
159             assertTrue(reply.exists());
160         }};
161     }
162
163     @Test
164     public void testOnReceiveDataExistsNegative() throws Exception {
165         new JavaTestKit(getSystem()) {{
166             final ActorRef shard = createShard();
167
168             testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
169                     "testDataExistsNegativeRO"));
170
171             testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
172                     "testDataExistsNegativeRW"));
173         }
174
175         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
176             transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
177
178             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
179
180             assertFalse(reply.exists());
181         }};
182     }
183
184     @Test
185     public void testOnReceiveBatchedModifications() throws Exception {
186         new JavaTestKit(getSystem()) {{
187
188             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
189             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
190             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
191             final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
192
193             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
194             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
195                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
196                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
197
198             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
199             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
200                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
201
202             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
203
204             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
205             batched.addModification(new WriteModification(writePath, writeData));
206             batched.addModification(new MergeModification(mergePath, mergeData));
207             batched.addModification(new DeleteModification(deletePath));
208
209             transaction.tell(batched, getRef());
210
211             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
212             assertEquals("getNumBatched", 3, reply.getNumBatched());
213
214             InOrder inOrder = Mockito.inOrder(mockModification);
215             inOrder.verify(mockModification).write(writePath, writeData);
216             inOrder.verify(mockModification).merge(mergePath, mergeData);
217             inOrder.verify(mockModification).delete(deletePath);
218         }};
219     }
220
221     @Test
222     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
223         new JavaTestKit(getSystem()) {{
224
225             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
226                     "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
227
228             JavaTestKit watcher = new JavaTestKit(getSystem());
229             watcher.watch(transaction);
230
231             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
232             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
233                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
234                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
235
236             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
237             batched.addModification(new WriteModification(writePath, writeData));
238
239             transaction.tell(batched, getRef());
240             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
241             assertEquals("getNumBatched", 1, reply.getNumBatched());
242
243             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
244             batched.setReady(true);
245             batched.setTotalMessagesSent(2);
246
247             transaction.tell(batched, getRef());
248             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
249             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
250         }};
251     }
252
253     @Test
254     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
255         new JavaTestKit(getSystem()) {{
256
257             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
258                     "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
259
260             JavaTestKit watcher = new JavaTestKit(getSystem());
261             watcher.watch(transaction);
262
263             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
264             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
265                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
266                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
267
268             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
269             batched.addModification(new WriteModification(writePath, writeData));
270             batched.setReady(true);
271             batched.setDoCommitOnReady(true);
272             batched.setTotalMessagesSent(1);
273
274             transaction.tell(batched, getRef());
275             expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
276             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
277         }};
278     }
279
280     @Test(expected=TestException.class)
281     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
282         new JavaTestKit(getSystem()) {{
283
284             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
285             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
286             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
287             final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
288                     "testOnReceiveBatchedModificationsFailure");
289
290             JavaTestKit watcher = new JavaTestKit(getSystem());
291             watcher.watch(transaction);
292
293             YangInstanceIdentifier path = TestModel.TEST_PATH;
294             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
295
296             doThrow(new TestException()).when(mockModification).write(path, node);
297
298             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
299             batched.addModification(new WriteModification(path, node));
300
301             transaction.tell(batched, getRef());
302             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
303
304             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
305             batched.setReady(true);
306             batched.setTotalMessagesSent(2);
307
308             transaction.tell(batched, getRef());
309             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
310             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
311
312             if(failure != null) {
313                 throw failure.cause();
314             }
315         }};
316     }
317
318     @Test(expected=IllegalStateException.class)
319     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
320         new JavaTestKit(getSystem()) {{
321
322             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
323                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
324
325             JavaTestKit watcher = new JavaTestKit(getSystem());
326             watcher.watch(transaction);
327
328             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
329             batched.setReady(true);
330             batched.setTotalMessagesSent(2);
331
332             transaction.tell(batched, getRef());
333
334             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
335             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
336
337             if(failure != null) {
338                 throw failure.cause();
339             }
340         }};
341     }
342
343     @Test
344     public void testOnReceiveCreateSnapshot() throws Exception {
345         new JavaTestKit(getSystem()) {{
346             ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
347                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
348
349             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
350                     YangInstanceIdentifier.EMPTY);
351
352             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
353                     "testOnReceiveCreateSnapshot");
354
355             watch(transaction);
356
357             transaction.tell(CreateSnapshot.INSTANCE, getRef());
358
359             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
360
361             assertNotNull("getSnapshot is null", reply.getSnapshot());
362
363             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
364                     reply.getSnapshot());
365
366             assertEquals("Root node", expectedRoot, actualRoot);
367
368             expectTerminated(duration("3 seconds"), transaction);
369         }};
370     }
371
372     @Test
373     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
374         new JavaTestKit(getSystem()) {{
375             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
376                     "testReadWriteTxOnReceiveCloseTransaction");
377
378             watch(transaction);
379
380             transaction.tell(new CloseTransaction().toSerializable(), getRef());
381
382             expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
383             expectTerminated(duration("3 seconds"), transaction);
384         }};
385     }
386
387     @Test
388     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
389         new JavaTestKit(getSystem()) {{
390             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
391                     "testWriteTxOnReceiveCloseTransaction");
392
393             watch(transaction);
394
395             transaction.tell(new CloseTransaction().toSerializable(), getRef());
396
397             expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
398             expectTerminated(duration("3 seconds"), transaction);
399         }};
400     }
401
402     @Test
403     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
404         new JavaTestKit(getSystem()) {{
405             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
406                     "testReadOnlyTxOnReceiveCloseTransaction");
407
408             watch(transaction);
409
410             transaction.tell(new CloseTransaction().toSerializable(), getRef());
411
412             expectMsgClass(duration("3 seconds"), Terminated.class);
413         }};
414     }
415
416     @Test(expected=UnknownMessageException.class)
417     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
418         final ActorRef shard = createShard();
419         final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
420                 datastoreContext, shardStats, "txn");
421         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
422
423         transaction.receive(new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null),
424                 ActorRef.noSender());
425     }
426
427     @Test
428     public void testShardTransactionInactivity() {
429
430         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
431                 500, TimeUnit.MILLISECONDS).build();
432
433         new JavaTestKit(getSystem()) {{
434             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
435                     "testShardTransactionInactivity");
436
437             watch(transaction);
438
439             expectMsgClass(duration("3 seconds"), Terminated.class);
440         }};
441     }
442
443     @Test
444     public void testOnReceivePreBoronReadData() throws Exception {
445         new JavaTestKit(getSystem()) {{
446             ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), createShard(),
447                     "testOnReceivePreBoronReadData");
448
449             transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.LITHIUM_VERSION).
450                     toSerializable(), getRef());
451
452             Object replySerialized = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
453             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
454         }};
455     }
456
457     @Test
458     public void testOnReceivePreBoronDataExists() throws Exception {
459         new JavaTestKit(getSystem()) {{
460             ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), createShard(),
461                     "testOnReceivePreBoronDataExists");
462
463             transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.LITHIUM_VERSION).
464                     toSerializable(), getRef());
465
466             Object replySerialized = expectMsgClass(duration("5 seconds"),
467                     ShardTransactionMessages.DataExistsReply.class);
468             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
469         }};
470     }
471
472     public static class TestException extends RuntimeException {
473         private static final long serialVersionUID = 1L;
474     }
475 }