BUG-8618: eliminate SimpleShardDataTreeCohort subclasses
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16
17 import akka.actor.ActorRef;
18 import akka.actor.Props;
19 import akka.actor.Status.Failure;
20 import akka.actor.Terminated;
21 import akka.dispatch.Dispatchers;
22 import akka.testkit.JavaTestKit;
23 import akka.testkit.TestActorRef;
24 import com.google.common.base.Throwables;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.Before;
27 import org.junit.Test;
28 import org.mockito.InOrder;
29 import org.mockito.Mockito;
30 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
31 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
32 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
33 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
35 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
38 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
42 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
43 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
44 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
45 import org.opendaylight.controller.cluster.raft.TestActorFactory;
46 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
51 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
52 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
53 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
54
55 public class ShardTransactionTest extends AbstractActorTest {
56
57     private static final TransactionType RO = TransactionType.READ_ONLY;
58     private static final TransactionType RW = TransactionType.READ_WRITE;
59     private static final TransactionType WO = TransactionType.WRITE_ONLY;
60
61     private static final ShardIdentifier SHARD_IDENTIFIER =
62         ShardIdentifier.create("inventory", MEMBER_NAME, "config");
63     private static final SchemaContext TEST_MODEL = TestModel.createTestContext();
64
65     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
66
67     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
68
69     private TestActorRef<Shard> shard;
70     private ShardDataTree store;
71
72     @Before
73     public void setUp() {
74         shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
75                 .schemaContextProvider(() -> TEST_MODEL).props()
76                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
77         ShardTestKit.waitUntilLeader(shard);
78         store = shard.underlyingActor().getDataStore();
79     }
80
81     private ActorRef newTransactionActor(final TransactionType type,
82             final AbstractShardDataTreeTransaction<?> transaction, final String name) {
83         Props props = ShardTransaction.props(type, transaction, shard, datastoreContext,
84                 shard.underlyingActor().getShardMBean());
85         return actorFactory.createActorNoVerify(props, name);
86     }
87
88     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
89         return store.newReadOnlyTransaction(nextTransactionId());
90     }
91
92     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
93         return store.newReadWriteTransaction(nextTransactionId());
94     }
95
96     @Test
97     public void testOnReceiveReadData() throws Exception {
98         new JavaTestKit(getSystem()) {
99             {
100                 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
101
102                 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
103             }
104
105             private void testOnReceiveReadData(final ActorRef transaction) {
106                 transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION),
107                         getRef());
108
109                 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
110
111                 assertNotNull(reply.getNormalizedNode());
112             }
113         };
114     }
115
116     @Test
117     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
118         new JavaTestKit(getSystem()) {
119             {
120                 testOnReceiveReadDataWhenDataNotFound(
121                         newTransactionActor(RO, readOnlyTransaction(), "testReadDataWhenDataNotFoundRO"));
122
123                 testOnReceiveReadDataWhenDataNotFound(
124                         newTransactionActor(RW, readWriteTransaction(), "testReadDataWhenDataNotFoundRW"));
125             }
126
127             private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
128                 transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
129
130                 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
131
132                 assertTrue(reply.getNormalizedNode() == null);
133             }
134         };
135     }
136
137     @Test
138     public void testOnReceiveDataExistsPositive() throws Exception {
139         new JavaTestKit(getSystem()) {
140             {
141                 testOnReceiveDataExistsPositive(
142                         newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
143
144                 testOnReceiveDataExistsPositive(
145                         newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
146             }
147
148             private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
149                 transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION),
150                         getRef());
151
152                 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
153
154                 assertTrue(reply.exists());
155             }
156         };
157     }
158
159     @Test
160     public void testOnReceiveDataExistsNegative() throws Exception {
161         new JavaTestKit(getSystem()) {
162             {
163                 testOnReceiveDataExistsNegative(
164                         newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
165
166                 testOnReceiveDataExistsNegative(
167                         newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
168             }
169
170             private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
171                 transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
172
173                 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
174
175                 assertFalse(reply.exists());
176             }
177         };
178     }
179
180     @Test
181     public void testOnReceiveBatchedModifications() throws Exception {
182         new JavaTestKit(getSystem()) {
183             {
184                 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
185                 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
186                 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
187                         nextTransactionId(), mockModification);
188                 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
189
190                 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
191                 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
192                         .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
193                         .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
194
195                 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
196                 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create()
197                         .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME))
198                         .build();
199
200                 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
201
202                 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
203                         DataStoreVersions.CURRENT_VERSION);
204                 batched.addModification(new WriteModification(writePath, writeData));
205                 batched.addModification(new MergeModification(mergePath, mergeData));
206                 batched.addModification(new DeleteModification(deletePath));
207
208                 transaction.tell(batched, getRef());
209
210                 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"),
211                         BatchedModificationsReply.class);
212                 assertEquals("getNumBatched", 3, reply.getNumBatched());
213
214                 InOrder inOrder = Mockito.inOrder(mockModification);
215                 inOrder.verify(mockModification).write(writePath, writeData);
216                 inOrder.verify(mockModification).merge(mergePath, mergeData);
217                 inOrder.verify(mockModification).delete(deletePath);
218             }
219         };
220     }
221
222     @Test
223     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
224         new JavaTestKit(getSystem()) {
225             {
226                 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
227                         "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
228
229                 JavaTestKit watcher = new JavaTestKit(getSystem());
230                 watcher.watch(transaction);
231
232                 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
233                 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
234                         .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
235                         .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
236
237                 final TransactionIdentifier tx1 = nextTransactionId();
238                 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
239                 batched.addModification(new WriteModification(writePath, writeData));
240
241                 transaction.tell(batched, getRef());
242                 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"),
243                         BatchedModificationsReply.class);
244                 assertEquals("getNumBatched", 1, reply.getNumBatched());
245
246                 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
247                 batched.setReady(true);
248                 batched.setTotalMessagesSent(2);
249
250                 transaction.tell(batched, getRef());
251                 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
252                 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
253             }
254         };
255     }
256
257     @Test
258     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
259         new JavaTestKit(getSystem()) {
260             {
261                 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
262                         "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
263
264                 JavaTestKit watcher = new JavaTestKit(getSystem());
265                 watcher.watch(transaction);
266
267                 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
268                 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
269                         .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
270                         .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
271
272                 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
273                         DataStoreVersions.CURRENT_VERSION);
274                 batched.addModification(new WriteModification(writePath, writeData));
275                 batched.setReady(true);
276                 batched.setDoCommitOnReady(true);
277                 batched.setTotalMessagesSent(1);
278
279                 transaction.tell(batched, getRef());
280                 expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
281                 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
282             }
283         };
284     }
285
286     @Test(expected = TestException.class)
287     public void testOnReceiveBatchedModificationsFailure() throws Exception {
288         new JavaTestKit(getSystem()) {
289             {
290
291                 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
292                 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
293                 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
294                         nextTransactionId(), mockModification);
295                 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
296                         "testOnReceiveBatchedModificationsFailure");
297
298                 JavaTestKit watcher = new JavaTestKit(getSystem());
299                 watcher.watch(transaction);
300
301                 YangInstanceIdentifier path = TestModel.TEST_PATH;
302                 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
303
304                 doThrow(new TestException()).when(mockModification).write(path, node);
305
306                 final TransactionIdentifier tx1 = nextTransactionId();
307                 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
308                 batched.addModification(new WriteModification(path, node));
309
310                 transaction.tell(batched, getRef());
311                 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
312
313                 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
314                 batched.setReady(true);
315                 batched.setTotalMessagesSent(2);
316
317                 transaction.tell(batched, getRef());
318                 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
319                 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
320
321                 if (failure != null) {
322                     Throwables.propagateIfPossible(failure.cause(), Exception.class);
323                     throw new RuntimeException(failure.cause());
324                 }
325             }
326         };
327     }
328
329     @Test(expected = IllegalStateException.class)
330     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
331         new JavaTestKit(getSystem()) {
332             {
333
334                 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
335                         "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
336
337                 JavaTestKit watcher = new JavaTestKit(getSystem());
338                 watcher.watch(transaction);
339
340                 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
341                         DataStoreVersions.CURRENT_VERSION);
342                 batched.setReady(true);
343                 batched.setTotalMessagesSent(2);
344
345                 transaction.tell(batched, getRef());
346
347                 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
348                 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
349
350                 if (failure != null) {
351                     Throwables.throwIfInstanceOf(failure.cause(), Exception.class);
352                     Throwables.throwIfUnchecked(failure.cause());
353                     throw new RuntimeException(failure.cause());
354                 }
355             }
356         };
357     }
358
359     @Test
360     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
361         new JavaTestKit(getSystem()) {
362             {
363                 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
364                         "testReadWriteTxOnReceiveCloseTransaction");
365
366                 watch(transaction);
367
368                 transaction.tell(new CloseTransaction().toSerializable(), getRef());
369
370                 expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
371                 expectTerminated(duration("3 seconds"), transaction);
372             }
373         };
374     }
375
376     @Test
377     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
378         new JavaTestKit(getSystem()) {
379             {
380                 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
381                         "testWriteTxOnReceiveCloseTransaction");
382
383                 watch(transaction);
384
385                 transaction.tell(new CloseTransaction().toSerializable(), getRef());
386
387                 expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
388                 expectTerminated(duration("3 seconds"), transaction);
389             }
390         };
391     }
392
393     @Test
394     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
395         new JavaTestKit(getSystem()) {
396             {
397                 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
398                         "testReadOnlyTxOnReceiveCloseTransaction");
399
400                 watch(transaction);
401
402                 transaction.tell(new CloseTransaction().toSerializable(), getRef());
403
404                 expectMsgClass(duration("3 seconds"), Terminated.class);
405             }
406         };
407     }
408
409     @Test
410     public void testShardTransactionInactivity() {
411         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
412                 500, TimeUnit.MILLISECONDS).build();
413
414         new JavaTestKit(getSystem()) {
415             {
416                 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
417                         "testShardTransactionInactivity");
418
419                 watch(transaction);
420
421                 expectMsgClass(duration("3 seconds"), Terminated.class);
422             }
423         };
424     }
425
426     public static class TestException extends RuntimeException {
427         private static final long serialVersionUID = 1L;
428     }
429 }