BUG-5280: expand ShardDataTree to cover transaction mechanics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.dispatch.Dispatchers;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import java.util.concurrent.TimeUnit;
24 import org.junit.Before;
25 import org.junit.Test;
26 import org.mockito.InOrder;
27 import org.mockito.Mockito;
28 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
29 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
31 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
41 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
43 import org.opendaylight.controller.cluster.raft.TestActorFactory;
44 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
46 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
49 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
50 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
51
52 public class ShardTransactionTest extends AbstractActorTest {
53
54     private static final TransactionType RO = TransactionType.READ_ONLY;
55     private static final TransactionType RW = TransactionType.READ_WRITE;
56     private static final TransactionType WO = TransactionType.WRITE_ONLY;
57
58     private static final ShardIdentifier SHARD_IDENTIFIER =
59         ShardIdentifier.create("inventory", MEMBER_NAME, "config");
60
61     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
62
63     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
64
65     private TestActorRef<Shard> shard;
66     private ShardDataTree store;
67
68     @Before
69     public void setUp() {
70         shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
71                 schemaContext(TestModel.createTestContext()).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
72         ShardTestKit.waitUntilLeader(shard);
73         store = shard.underlyingActor().getDataStore();
74     }
75
76     private ActorRef newTransactionActor(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction, final String name) {
77         Props props = ShardTransaction.props(type, transaction, shard, datastoreContext, shard.underlyingActor().getShardMBean());
78         return actorFactory.createActor(props, name);
79     }
80
81     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
82         return store.newReadOnlyTransaction(nextTransactionId());
83     }
84
85     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
86         return store.newReadWriteTransaction(nextTransactionId());
87     }
88
89     @Test
90     public void testOnReceiveReadData() throws Exception {
91         new JavaTestKit(getSystem()) {{
92             testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
93
94             testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
95         }
96
97         private void testOnReceiveReadData(final ActorRef transaction) {
98             transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY,
99                     DataStoreVersions.CURRENT_VERSION), getRef());
100
101             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
102
103             assertNotNull(reply.getNormalizedNode());
104         }};
105     }
106
107     @Test
108     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
109         new JavaTestKit(getSystem()) {{
110             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
111                     RO, readOnlyTransaction(), "testReadDataWhenDataNotFoundRO"));
112
113             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
114                     RW, readWriteTransaction(), "testReadDataWhenDataNotFoundRW"));
115         }
116
117         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
118             transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
119
120             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
121
122             assertTrue(reply.getNormalizedNode() == null);
123         }};
124     }
125
126     @Test
127     public void testOnReceiveDataExistsPositive() throws Exception {
128         new JavaTestKit(getSystem()) {{
129             testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(),
130                     "testDataExistsPositiveRO"));
131
132             testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(),
133                     "testDataExistsPositiveRW"));
134         }
135
136         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
137             transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY,
138                     DataStoreVersions.CURRENT_VERSION), getRef());
139
140             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
141
142             assertTrue(reply.exists());
143         }};
144     }
145
146     @Test
147     public void testOnReceiveDataExistsNegative() throws Exception {
148         new JavaTestKit(getSystem()) {{
149             testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(),
150                     "testDataExistsNegativeRO"));
151
152             testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(),
153                     "testDataExistsNegativeRW"));
154         }
155
156         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
157             transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
158
159             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
160
161             assertFalse(reply.exists());
162         }};
163     }
164
165     @Test
166     public void testOnReceiveBatchedModifications() throws Exception {
167         new JavaTestKit(getSystem()) {{
168
169             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
170             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
171             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
172                 nextTransactionId(), mockModification);
173             final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
174
175             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
176             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
177                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
178                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
179
180             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
181             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
182                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
183
184             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
185
186             BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
187             batched.addModification(new WriteModification(writePath, writeData));
188             batched.addModification(new MergeModification(mergePath, mergeData));
189             batched.addModification(new DeleteModification(deletePath));
190
191             transaction.tell(batched, getRef());
192
193             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
194             assertEquals("getNumBatched", 3, reply.getNumBatched());
195
196             InOrder inOrder = Mockito.inOrder(mockModification);
197             inOrder.verify(mockModification).write(writePath, writeData);
198             inOrder.verify(mockModification).merge(mergePath, mergeData);
199             inOrder.verify(mockModification).delete(deletePath);
200         }};
201     }
202
203     @Test
204     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
205         new JavaTestKit(getSystem()) {{
206
207             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
208                     "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
209
210             JavaTestKit watcher = new JavaTestKit(getSystem());
211             watcher.watch(transaction);
212
213             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
214             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
215                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
216                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
217
218             final TransactionIdentifier tx1 = nextTransactionId();
219             BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
220             batched.addModification(new WriteModification(writePath, writeData));
221
222             transaction.tell(batched, getRef());
223             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
224             assertEquals("getNumBatched", 1, reply.getNumBatched());
225
226             batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
227             batched.setReady(true);
228             batched.setTotalMessagesSent(2);
229
230             transaction.tell(batched, getRef());
231             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
232             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
233         }};
234     }
235
236     @Test
237     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
238         new JavaTestKit(getSystem()) {{
239
240             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
241                     "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
242
243             JavaTestKit watcher = new JavaTestKit(getSystem());
244             watcher.watch(transaction);
245
246             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
247             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
248                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
249                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
250
251             BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
252             batched.addModification(new WriteModification(writePath, writeData));
253             batched.setReady(true);
254             batched.setDoCommitOnReady(true);
255             batched.setTotalMessagesSent(1);
256
257             transaction.tell(batched, getRef());
258             expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
259             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
260         }};
261     }
262
263     @Test(expected=TestException.class)
264     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
265         new JavaTestKit(getSystem()) {{
266
267             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
268             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
269             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
270                 nextTransactionId(), mockModification);
271             final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
272                     "testOnReceiveBatchedModificationsFailure");
273
274             JavaTestKit watcher = new JavaTestKit(getSystem());
275             watcher.watch(transaction);
276
277             YangInstanceIdentifier path = TestModel.TEST_PATH;
278             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
279
280             doThrow(new TestException()).when(mockModification).write(path, node);
281
282             final TransactionIdentifier tx1 = nextTransactionId();
283             BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
284             batched.addModification(new WriteModification(path, node));
285
286             transaction.tell(batched, getRef());
287             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
288
289             batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
290             batched.setReady(true);
291             batched.setTotalMessagesSent(2);
292
293             transaction.tell(batched, getRef());
294             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
295             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
296
297             if(failure != null) {
298                 throw failure.cause();
299             }
300         }};
301     }
302
303     @Test(expected=IllegalStateException.class)
304     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
305         new JavaTestKit(getSystem()) {{
306
307             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
308                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
309
310             JavaTestKit watcher = new JavaTestKit(getSystem());
311             watcher.watch(transaction);
312
313             BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
314             batched.setReady(true);
315             batched.setTotalMessagesSent(2);
316
317             transaction.tell(batched, getRef());
318
319             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
320             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
321
322             if(failure != null) {
323                 throw failure.cause();
324             }
325         }};
326     }
327
328     @Test
329     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
330         new JavaTestKit(getSystem()) {{
331             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
332                     "testReadWriteTxOnReceiveCloseTransaction");
333
334             watch(transaction);
335
336             transaction.tell(new CloseTransaction().toSerializable(), getRef());
337
338             expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
339             expectTerminated(duration("3 seconds"), transaction);
340         }};
341     }
342
343     @Test
344     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
345         new JavaTestKit(getSystem()) {{
346             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
347                     "testWriteTxOnReceiveCloseTransaction");
348
349             watch(transaction);
350
351             transaction.tell(new CloseTransaction().toSerializable(), getRef());
352
353             expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
354             expectTerminated(duration("3 seconds"), transaction);
355         }};
356     }
357
358     @Test
359     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
360         new JavaTestKit(getSystem()) {{
361             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
362                     "testReadOnlyTxOnReceiveCloseTransaction");
363
364             watch(transaction);
365
366             transaction.tell(new CloseTransaction().toSerializable(), getRef());
367
368             expectMsgClass(duration("3 seconds"), Terminated.class);
369         }};
370     }
371
372     @Test
373     public void testShardTransactionInactivity() {
374
375         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
376                 500, TimeUnit.MILLISECONDS).build();
377
378         new JavaTestKit(getSystem()) {{
379             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
380                     "testShardTransactionInactivity");
381
382             watch(transaction);
383
384             expectMsgClass(duration("3 seconds"), Terminated.class);
385         }};
386     }
387     public static class TestException extends RuntimeException {
388         private static final long serialVersionUID = 1L;
389     }
390 }