Integrate MRI projects for Neon
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import static org.mockito.Mockito.inOrder;
17 import static org.mockito.Mockito.mock;
18
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.actor.Status.Failure;
22 import akka.actor.Terminated;
23 import akka.dispatch.Dispatchers;
24 import akka.testkit.TestActorRef;
25 import akka.testkit.javadsl.TestKit;
26 import com.google.common.base.Throwables;
27 import java.util.concurrent.TimeUnit;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.mockito.InOrder;
31 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
32 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
33 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
34 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
35 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
39 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
43 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
44 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
45 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
46 import org.opendaylight.controller.cluster.raft.TestActorFactory;
47 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
52 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
53 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55
56 public class ShardTransactionTest extends AbstractActorTest {
57
58     private static final TransactionType RO = TransactionType.READ_ONLY;
59     private static final TransactionType RW = TransactionType.READ_WRITE;
60     private static final TransactionType WO = TransactionType.WRITE_ONLY;
61
62     private static final ShardIdentifier SHARD_IDENTIFIER =
63         ShardIdentifier.create("inventory", MEMBER_NAME, "config");
64     private static final SchemaContext TEST_MODEL = TestModel.createTestContext();
65
66     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
67
68     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
69
70     private TestActorRef<Shard> shard;
71     private ShardDataTree store;
72     private TestKit testKit;
73
74     @Before
75     public void setUp() {
76         shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
77                 .schemaContextProvider(() -> TEST_MODEL).props()
78                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
79         ShardTestKit.waitUntilLeader(shard);
80         store = shard.underlyingActor().getDataStore();
81         testKit = new TestKit(getSystem());
82     }
83
84     private ActorRef newTransactionActor(final TransactionType type,
85             final AbstractShardDataTreeTransaction<?> transaction, final String name) {
86         Props props = ShardTransaction.props(type, transaction, shard, datastoreContext,
87                 shard.underlyingActor().getShardMBean());
88         return actorFactory.createActorNoVerify(props, name);
89     }
90
91     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
92         return store.newReadOnlyTransaction(nextTransactionId());
93     }
94
95     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
96         return store.newReadWriteTransaction(nextTransactionId());
97     }
98
99     @Test
100     public void testOnReceiveReadData() {
101         testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
102         testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
103     }
104
105     private void testOnReceiveReadData(final ActorRef transaction) {
106         transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION),
107             testKit.getRef());
108
109         ReadDataReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ReadDataReply.class);
110
111         assertNotNull(reply.getNormalizedNode());
112     }
113
114     @Test
115     public void testOnReceiveReadDataWhenDataNotFound() {
116         testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RO, readOnlyTransaction(),
117             "testReadDataWhenDataNotFoundRO"));
118         testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RW, readWriteTransaction(),
119             "testReadDataWhenDataNotFoundRW"));
120     }
121
122     private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
123         transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
124
125         ReadDataReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ReadDataReply.class);
126
127         assertNull(reply.getNormalizedNode());
128     }
129
130     @Test
131     public void testOnReceiveDataExistsPositive() {
132         testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
133         testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
134     }
135
136     private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
137         transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION),
138             testKit.getRef());
139
140         DataExistsReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), DataExistsReply.class);
141
142         assertTrue(reply.exists());
143     }
144
145     @Test
146     public void testOnReceiveDataExistsNegative() {
147         testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
148         testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
149     }
150
151     private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
152         transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
153
154         DataExistsReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), DataExistsReply.class);
155
156         assertFalse(reply.exists());
157     }
158
159     @Test
160     public void testOnReceiveBatchedModifications() {
161         ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
162         DataTreeModification mockModification = mock(DataTreeModification.class);
163         ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
164             nextTransactionId(), mockModification);
165         final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
166
167         YangInstanceIdentifier writePath = TestModel.TEST_PATH;
168         NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
169                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
170                 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
171
172         YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
173         NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create()
174                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME))
175                 .build();
176
177         YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
178
179         BatchedModifications batched = new BatchedModifications(nextTransactionId(),
180             DataStoreVersions.CURRENT_VERSION);
181         batched.addModification(new WriteModification(writePath, writeData));
182         batched.addModification(new MergeModification(mergePath, mergeData));
183         batched.addModification(new DeleteModification(deletePath));
184
185         transaction.tell(batched, testKit.getRef());
186
187         BatchedModificationsReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"),
188             BatchedModificationsReply.class);
189         assertEquals("getNumBatched", 3, reply.getNumBatched());
190
191         InOrder inOrder = inOrder(mockModification);
192         inOrder.verify(mockModification).write(writePath, writeData);
193         inOrder.verify(mockModification).merge(mergePath, mergeData);
194         inOrder.verify(mockModification).delete(deletePath);
195     }
196
197     @Test
198     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() {
199         final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
200                 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
201
202         TestKit watcher = new TestKit(getSystem());
203         watcher.watch(transaction);
204
205         YangInstanceIdentifier writePath = TestModel.TEST_PATH;
206         NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
207                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
208                 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
209
210         final TransactionIdentifier tx1 = nextTransactionId();
211         BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
212         batched.addModification(new WriteModification(writePath, writeData));
213
214         transaction.tell(batched, testKit.getRef());
215         BatchedModificationsReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"),
216             BatchedModificationsReply.class);
217         assertEquals("getNumBatched", 1, reply.getNumBatched());
218
219         batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
220         batched.setReady();
221         batched.setTotalMessagesSent(2);
222
223         transaction.tell(batched, testKit.getRef());
224         testKit.expectMsgClass(testKit.duration("5 seconds"), ReadyTransactionReply.class);
225         watcher.expectMsgClass(watcher.duration("5 seconds"), Terminated.class);
226     }
227
228     @Test
229     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() {
230         final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
231                 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
232
233         TestKit watcher = new TestKit(getSystem());
234         watcher.watch(transaction);
235
236         YangInstanceIdentifier writePath = TestModel.TEST_PATH;
237         NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
238                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
239                 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
240
241         BatchedModifications batched = new BatchedModifications(nextTransactionId(),
242             DataStoreVersions.CURRENT_VERSION);
243         batched.addModification(new WriteModification(writePath, writeData));
244         batched.setReady();
245         batched.setDoCommitOnReady(true);
246         batched.setTotalMessagesSent(1);
247
248         transaction.tell(batched, testKit.getRef());
249         testKit.expectMsgClass(testKit.duration("5 seconds"), CommitTransactionReply.class);
250         watcher.expectMsgClass(testKit.duration("5 seconds"), Terminated.class);
251     }
252
253     @Test(expected = TestException.class)
254     public void testOnReceiveBatchedModificationsFailure() throws Exception {
255         ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
256         DataTreeModification mockModification = mock(DataTreeModification.class);
257         ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
258             nextTransactionId(), mockModification);
259         final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
260             "testOnReceiveBatchedModificationsFailure");
261
262         TestKit watcher = new TestKit(getSystem());
263         watcher.watch(transaction);
264
265         YangInstanceIdentifier path = TestModel.TEST_PATH;
266         ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
267
268         doThrow(new TestException()).when(mockModification).write(path, node);
269
270         final TransactionIdentifier tx1 = nextTransactionId();
271         BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
272         batched.addModification(new WriteModification(path, node));
273
274         transaction.tell(batched, testKit.getRef());
275         testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class);
276
277         batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
278         batched.setReady();
279         batched.setTotalMessagesSent(2);
280
281         transaction.tell(batched, testKit.getRef());
282         Failure failure = testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class);
283         watcher.expectMsgClass(testKit.duration("5 seconds"), Terminated.class);
284
285         if (failure != null) {
286             Throwables.propagateIfPossible(failure.cause(), Exception.class);
287             throw new RuntimeException(failure.cause());
288         }
289     }
290
291     @Test(expected = IllegalStateException.class)
292     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
293         final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
294                 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
295
296         TestKit watcher = new TestKit(getSystem());
297         watcher.watch(transaction);
298
299         BatchedModifications batched = new BatchedModifications(nextTransactionId(),
300             DataStoreVersions.CURRENT_VERSION);
301         batched.setReady();
302         batched.setTotalMessagesSent(2);
303
304         transaction.tell(batched, testKit.getRef());
305
306         Failure failure = testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class);
307         watcher.expectMsgClass(watcher.duration("5 seconds"), Terminated.class);
308
309         if (failure != null) {
310             Throwables.throwIfInstanceOf(failure.cause(), Exception.class);
311             Throwables.throwIfUnchecked(failure.cause());
312             throw new RuntimeException(failure.cause());
313         }
314     }
315
316     @Test
317     public void testReadWriteTxOnReceiveCloseTransaction() {
318         final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
319                 "testReadWriteTxOnReceiveCloseTransaction");
320
321         testKit.watch(transaction);
322
323         transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
324
325         testKit.expectMsgClass(testKit.duration("3 seconds"), CloseTransactionReply.class);
326         testKit.expectTerminated(testKit.duration("3 seconds"), transaction);
327     }
328
329     @Test
330     public void testWriteOnlyTxOnReceiveCloseTransaction() {
331         final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
332                 "testWriteTxOnReceiveCloseTransaction");
333
334         testKit.watch(transaction);
335
336         transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
337
338         testKit.expectMsgClass(testKit.duration("3 seconds"), CloseTransactionReply.class);
339         testKit.expectTerminated(testKit.duration("3 seconds"), transaction);
340     }
341
342     @Test
343     public void testReadOnlyTxOnReceiveCloseTransaction() {
344         final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
345                 "testReadOnlyTxOnReceiveCloseTransaction");
346
347         testKit.watch(transaction);
348
349         transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
350
351         testKit.expectMsgClass(testKit.duration("3 seconds"), Terminated.class);
352     }
353
354     @Test
355     public void testShardTransactionInactivity() {
356         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
357                 500, TimeUnit.MILLISECONDS).build();
358
359         final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
360             "testShardTransactionInactivity");
361
362         testKit.watch(transaction);
363
364         testKit.expectMsgClass(testKit.duration("3 seconds"), Terminated.class);
365     }
366
367     public static class TestException extends RuntimeException {
368         private static final long serialVersionUID = 1L;
369     }
370 }