BUG-2138: DistributedShardListeners support for nested shards
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16
17 import akka.actor.ActorRef;
18 import akka.actor.Props;
19 import akka.actor.Status.Failure;
20 import akka.actor.Terminated;
21 import akka.dispatch.Dispatchers;
22 import akka.testkit.JavaTestKit;
23 import akka.testkit.TestActorRef;
24 import com.google.common.base.Throwables;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.Before;
27 import org.junit.Test;
28 import org.mockito.InOrder;
29 import org.mockito.Mockito;
30 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
31 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
32 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
33 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
35 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
38 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
42 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
43 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
44 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
45 import org.opendaylight.controller.cluster.raft.TestActorFactory;
46 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
51 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
52 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
53
54 public class ShardTransactionTest extends AbstractActorTest {
55
56     private static final TransactionType RO = TransactionType.READ_ONLY;
57     private static final TransactionType RW = TransactionType.READ_WRITE;
58     private static final TransactionType WO = TransactionType.WRITE_ONLY;
59
60     private static final ShardIdentifier SHARD_IDENTIFIER =
61         ShardIdentifier.create("inventory", MEMBER_NAME, "config");
62
63     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
64
65     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
66
67     private TestActorRef<Shard> shard;
68     private ShardDataTree store;
69
70     @Before
71     public void setUp() {
72         shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
73                 .schemaContext(TestModel.createTestContext()).props()
74                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
75         ShardTestKit.waitUntilLeader(shard);
76         store = shard.underlyingActor().getDataStore();
77     }
78
79     private ActorRef newTransactionActor(final TransactionType type,
80             final AbstractShardDataTreeTransaction<?> transaction, final String name) {
81         Props props = ShardTransaction.props(type, transaction, shard, datastoreContext,
82                 shard.underlyingActor().getShardMBean());
83         return actorFactory.createActorNoVerify(props, name);
84     }
85
86     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
87         return store.newReadOnlyTransaction(nextTransactionId());
88     }
89
90     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
91         return store.newReadWriteTransaction(nextTransactionId());
92     }
93
94     @Test
95     public void testOnReceiveReadData() throws Exception {
96         new JavaTestKit(getSystem()) {
97             {
98                 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
99
100                 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
101             }
102
103             private void testOnReceiveReadData(final ActorRef transaction) {
104                 transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION),
105                         getRef());
106
107                 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
108
109                 assertNotNull(reply.getNormalizedNode());
110             }
111         };
112     }
113
114     @Test
115     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
116         new JavaTestKit(getSystem()) {
117             {
118                 testOnReceiveReadDataWhenDataNotFound(
119                         newTransactionActor(RO, readOnlyTransaction(), "testReadDataWhenDataNotFoundRO"));
120
121                 testOnReceiveReadDataWhenDataNotFound(
122                         newTransactionActor(RW, readWriteTransaction(), "testReadDataWhenDataNotFoundRW"));
123             }
124
125             private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
126                 transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
127
128                 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
129
130                 assertTrue(reply.getNormalizedNode() == null);
131             }
132         };
133     }
134
135     @Test
136     public void testOnReceiveDataExistsPositive() throws Exception {
137         new JavaTestKit(getSystem()) {
138             {
139                 testOnReceiveDataExistsPositive(
140                         newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
141
142                 testOnReceiveDataExistsPositive(
143                         newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
144             }
145
146             private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
147                 transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION),
148                         getRef());
149
150                 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
151
152                 assertTrue(reply.exists());
153             }
154         };
155     }
156
157     @Test
158     public void testOnReceiveDataExistsNegative() throws Exception {
159         new JavaTestKit(getSystem()) {
160             {
161                 testOnReceiveDataExistsNegative(
162                         newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
163
164                 testOnReceiveDataExistsNegative(
165                         newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
166             }
167
168             private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
169                 transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
170
171                 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
172
173                 assertFalse(reply.exists());
174             }
175         };
176     }
177
178     @Test
179     public void testOnReceiveBatchedModifications() throws Exception {
180         new JavaTestKit(getSystem()) {
181             {
182                 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
183                 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
184                 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
185                         nextTransactionId(), mockModification);
186                 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
187
188                 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
189                 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
190                         .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
191                         .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
192
193                 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
194                 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create()
195                         .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME))
196                         .build();
197
198                 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
199
200                 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
201                         DataStoreVersions.CURRENT_VERSION);
202                 batched.addModification(new WriteModification(writePath, writeData));
203                 batched.addModification(new MergeModification(mergePath, mergeData));
204                 batched.addModification(new DeleteModification(deletePath));
205
206                 transaction.tell(batched, getRef());
207
208                 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"),
209                         BatchedModificationsReply.class);
210                 assertEquals("getNumBatched", 3, reply.getNumBatched());
211
212                 InOrder inOrder = Mockito.inOrder(mockModification);
213                 inOrder.verify(mockModification).write(writePath, writeData);
214                 inOrder.verify(mockModification).merge(mergePath, mergeData);
215                 inOrder.verify(mockModification).delete(deletePath);
216             }
217         };
218     }
219
220     @Test
221     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
222         new JavaTestKit(getSystem()) {
223             {
224                 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
225                         "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
226
227                 JavaTestKit watcher = new JavaTestKit(getSystem());
228                 watcher.watch(transaction);
229
230                 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
231                 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
232                         .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
233                         .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
234
235                 final TransactionIdentifier tx1 = nextTransactionId();
236                 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
237                 batched.addModification(new WriteModification(writePath, writeData));
238
239                 transaction.tell(batched, getRef());
240                 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"),
241                         BatchedModificationsReply.class);
242                 assertEquals("getNumBatched", 1, reply.getNumBatched());
243
244                 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
245                 batched.setReady(true);
246                 batched.setTotalMessagesSent(2);
247
248                 transaction.tell(batched, getRef());
249                 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
250                 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
251             }
252         };
253     }
254
255     @Test
256     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
257         new JavaTestKit(getSystem()) {
258             {
259                 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
260                         "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
261
262                 JavaTestKit watcher = new JavaTestKit(getSystem());
263                 watcher.watch(transaction);
264
265                 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
266                 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
267                         .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
268                         .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
269
270                 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
271                         DataStoreVersions.CURRENT_VERSION);
272                 batched.addModification(new WriteModification(writePath, writeData));
273                 batched.setReady(true);
274                 batched.setDoCommitOnReady(true);
275                 batched.setTotalMessagesSent(1);
276
277                 transaction.tell(batched, getRef());
278                 expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
279                 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
280             }
281         };
282     }
283
284     @Test(expected = TestException.class)
285     public void testOnReceiveBatchedModificationsFailure() throws Exception {
286         new JavaTestKit(getSystem()) {
287             {
288
289                 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
290                 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
291                 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
292                         nextTransactionId(), mockModification);
293                 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
294                         "testOnReceiveBatchedModificationsFailure");
295
296                 JavaTestKit watcher = new JavaTestKit(getSystem());
297                 watcher.watch(transaction);
298
299                 YangInstanceIdentifier path = TestModel.TEST_PATH;
300                 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
301
302                 doThrow(new TestException()).when(mockModification).write(path, node);
303
304                 final TransactionIdentifier tx1 = nextTransactionId();
305                 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
306                 batched.addModification(new WriteModification(path, node));
307
308                 transaction.tell(batched, getRef());
309                 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
310
311                 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
312                 batched.setReady(true);
313                 batched.setTotalMessagesSent(2);
314
315                 transaction.tell(batched, getRef());
316                 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
317                 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
318
319                 if (failure != null) {
320                     Throwables.propagateIfInstanceOf(failure.cause(), Exception.class);
321                     Throwables.propagate(failure.cause());
322                 }
323             }
324         };
325     }
326
327     @Test(expected = IllegalStateException.class)
328     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
329         new JavaTestKit(getSystem()) {
330             {
331
332                 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
333                         "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
334
335                 JavaTestKit watcher = new JavaTestKit(getSystem());
336                 watcher.watch(transaction);
337
338                 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
339                         DataStoreVersions.CURRENT_VERSION);
340                 batched.setReady(true);
341                 batched.setTotalMessagesSent(2);
342
343                 transaction.tell(batched, getRef());
344
345                 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
346                 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
347
348                 if (failure != null) {
349                     Throwables.propagateIfInstanceOf(failure.cause(), Exception.class);
350                     Throwables.propagate(failure.cause());
351                 }
352             }
353         };
354     }
355
356     @Test
357     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
358         new JavaTestKit(getSystem()) {
359             {
360                 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
361                         "testReadWriteTxOnReceiveCloseTransaction");
362
363                 watch(transaction);
364
365                 transaction.tell(new CloseTransaction().toSerializable(), getRef());
366
367                 expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
368                 expectTerminated(duration("3 seconds"), transaction);
369             }
370         };
371     }
372
373     @Test
374     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
375         new JavaTestKit(getSystem()) {
376             {
377                 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
378                         "testWriteTxOnReceiveCloseTransaction");
379
380                 watch(transaction);
381
382                 transaction.tell(new CloseTransaction().toSerializable(), getRef());
383
384                 expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
385                 expectTerminated(duration("3 seconds"), transaction);
386             }
387         };
388     }
389
390     @Test
391     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
392         new JavaTestKit(getSystem()) {
393             {
394                 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
395                         "testReadOnlyTxOnReceiveCloseTransaction");
396
397                 watch(transaction);
398
399                 transaction.tell(new CloseTransaction().toSerializable(), getRef());
400
401                 expectMsgClass(duration("3 seconds"), Terminated.class);
402             }
403         };
404     }
405
406     @Test
407     public void testShardTransactionInactivity() {
408         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
409                 500, TimeUnit.MILLISECONDS).build();
410
411         new JavaTestKit(getSystem()) {
412             {
413                 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
414                         "testShardTransactionInactivity");
415
416                 watch(transaction);
417
418                 expectMsgClass(duration("3 seconds"), Terminated.class);
419             }
420         };
421     }
422
423     public static class TestException extends RuntimeException {
424         private static final long serialVersionUID = 1L;
425     }
426 }