2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
17 import akka.actor.ActorRef;
18 import akka.actor.Props;
19 import akka.actor.Status.Failure;
20 import akka.actor.Terminated;
21 import akka.dispatch.Dispatchers;
22 import akka.testkit.JavaTestKit;
23 import akka.testkit.TestActorRef;
24 import com.google.common.base.Throwables;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.Before;
27 import org.junit.Test;
28 import org.mockito.InOrder;
29 import org.mockito.Mockito;
30 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
31 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
32 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
33 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
35 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
38 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
42 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
43 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
44 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
45 import org.opendaylight.controller.cluster.raft.TestActorFactory;
46 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
51 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
52 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
54 public class ShardTransactionTest extends AbstractActorTest {
56 private static final TransactionType RO = TransactionType.READ_ONLY;
57 private static final TransactionType RW = TransactionType.READ_WRITE;
58 private static final TransactionType WO = TransactionType.WRITE_ONLY;
60 private static final ShardIdentifier SHARD_IDENTIFIER =
61 ShardIdentifier.create("inventory", MEMBER_NAME, "config");
63 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
65 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
67 private TestActorRef<Shard> shard;
68 private ShardDataTree store;
72 shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
73 .schemaContext(TestModel.createTestContext()).props()
74 .withDispatcher(Dispatchers.DefaultDispatcherId()));
75 ShardTestKit.waitUntilLeader(shard);
76 store = shard.underlyingActor().getDataStore();
79 private ActorRef newTransactionActor(final TransactionType type,
80 final AbstractShardDataTreeTransaction<?> transaction, final String name) {
81 Props props = ShardTransaction.props(type, transaction, shard, datastoreContext,
82 shard.underlyingActor().getShardMBean());
83 return actorFactory.createActorNoVerify(props, name);
86 private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
87 return store.newReadOnlyTransaction(nextTransactionId());
90 private ReadWriteShardDataTreeTransaction readWriteTransaction() {
91 return store.newReadWriteTransaction(nextTransactionId());
95 public void testOnReceiveReadData() throws Exception {
96 new JavaTestKit(getSystem()) {
98 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
100 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
103 private void testOnReceiveReadData(final ActorRef transaction) {
104 transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION),
107 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
109 assertNotNull(reply.getNormalizedNode());
115 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
116 new JavaTestKit(getSystem()) {
118 testOnReceiveReadDataWhenDataNotFound(
119 newTransactionActor(RO, readOnlyTransaction(), "testReadDataWhenDataNotFoundRO"));
121 testOnReceiveReadDataWhenDataNotFound(
122 newTransactionActor(RW, readWriteTransaction(), "testReadDataWhenDataNotFoundRW"));
125 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
126 transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
128 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
130 assertTrue(reply.getNormalizedNode() == null);
136 public void testOnReceiveDataExistsPositive() throws Exception {
137 new JavaTestKit(getSystem()) {
139 testOnReceiveDataExistsPositive(
140 newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
142 testOnReceiveDataExistsPositive(
143 newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
146 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
147 transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION),
150 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
152 assertTrue(reply.exists());
158 public void testOnReceiveDataExistsNegative() throws Exception {
159 new JavaTestKit(getSystem()) {
161 testOnReceiveDataExistsNegative(
162 newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
164 testOnReceiveDataExistsNegative(
165 newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
168 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
169 transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
171 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
173 assertFalse(reply.exists());
179 public void testOnReceiveBatchedModifications() throws Exception {
180 new JavaTestKit(getSystem()) {
182 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
183 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
184 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
185 nextTransactionId(), mockModification);
186 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
188 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
189 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
190 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
191 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
193 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
194 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create()
195 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME))
198 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
200 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
201 DataStoreVersions.CURRENT_VERSION);
202 batched.addModification(new WriteModification(writePath, writeData));
203 batched.addModification(new MergeModification(mergePath, mergeData));
204 batched.addModification(new DeleteModification(deletePath));
206 transaction.tell(batched, getRef());
208 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"),
209 BatchedModificationsReply.class);
210 assertEquals("getNumBatched", 3, reply.getNumBatched());
212 InOrder inOrder = Mockito.inOrder(mockModification);
213 inOrder.verify(mockModification).write(writePath, writeData);
214 inOrder.verify(mockModification).merge(mergePath, mergeData);
215 inOrder.verify(mockModification).delete(deletePath);
221 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
222 new JavaTestKit(getSystem()) {
224 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
225 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
227 JavaTestKit watcher = new JavaTestKit(getSystem());
228 watcher.watch(transaction);
230 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
231 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
232 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
233 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
235 final TransactionIdentifier tx1 = nextTransactionId();
236 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
237 batched.addModification(new WriteModification(writePath, writeData));
239 transaction.tell(batched, getRef());
240 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"),
241 BatchedModificationsReply.class);
242 assertEquals("getNumBatched", 1, reply.getNumBatched());
244 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
245 batched.setReady(true);
246 batched.setTotalMessagesSent(2);
248 transaction.tell(batched, getRef());
249 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
250 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
256 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
257 new JavaTestKit(getSystem()) {
259 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
260 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
262 JavaTestKit watcher = new JavaTestKit(getSystem());
263 watcher.watch(transaction);
265 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
266 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
267 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
268 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
270 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
271 DataStoreVersions.CURRENT_VERSION);
272 batched.addModification(new WriteModification(writePath, writeData));
273 batched.setReady(true);
274 batched.setDoCommitOnReady(true);
275 batched.setTotalMessagesSent(1);
277 transaction.tell(batched, getRef());
278 expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
279 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
284 @Test(expected = TestException.class)
285 public void testOnReceiveBatchedModificationsFailure() throws Exception {
286 new JavaTestKit(getSystem()) {
289 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
290 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
291 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
292 nextTransactionId(), mockModification);
293 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
294 "testOnReceiveBatchedModificationsFailure");
296 JavaTestKit watcher = new JavaTestKit(getSystem());
297 watcher.watch(transaction);
299 YangInstanceIdentifier path = TestModel.TEST_PATH;
300 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
302 doThrow(new TestException()).when(mockModification).write(path, node);
304 final TransactionIdentifier tx1 = nextTransactionId();
305 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
306 batched.addModification(new WriteModification(path, node));
308 transaction.tell(batched, getRef());
309 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
311 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
312 batched.setReady(true);
313 batched.setTotalMessagesSent(2);
315 transaction.tell(batched, getRef());
316 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
317 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
319 if (failure != null) {
320 Throwables.propagateIfInstanceOf(failure.cause(), Exception.class);
321 Throwables.propagate(failure.cause());
327 @Test(expected = IllegalStateException.class)
328 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
329 new JavaTestKit(getSystem()) {
332 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
333 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
335 JavaTestKit watcher = new JavaTestKit(getSystem());
336 watcher.watch(transaction);
338 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
339 DataStoreVersions.CURRENT_VERSION);
340 batched.setReady(true);
341 batched.setTotalMessagesSent(2);
343 transaction.tell(batched, getRef());
345 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
346 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
348 if (failure != null) {
349 Throwables.propagateIfInstanceOf(failure.cause(), Exception.class);
350 Throwables.propagate(failure.cause());
357 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
358 new JavaTestKit(getSystem()) {
360 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
361 "testReadWriteTxOnReceiveCloseTransaction");
365 transaction.tell(new CloseTransaction().toSerializable(), getRef());
367 expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
368 expectTerminated(duration("3 seconds"), transaction);
374 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
375 new JavaTestKit(getSystem()) {
377 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
378 "testWriteTxOnReceiveCloseTransaction");
382 transaction.tell(new CloseTransaction().toSerializable(), getRef());
384 expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
385 expectTerminated(duration("3 seconds"), transaction);
391 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
392 new JavaTestKit(getSystem()) {
394 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
395 "testReadOnlyTxOnReceiveCloseTransaction");
399 transaction.tell(new CloseTransaction().toSerializable(), getRef());
401 expectMsgClass(duration("3 seconds"), Terminated.class);
407 public void testShardTransactionInactivity() {
408 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
409 500, TimeUnit.MILLISECONDS).build();
411 new JavaTestKit(getSystem()) {
413 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
414 "testShardTransactionInactivity");
418 expectMsgClass(duration("3 seconds"), Terminated.class);
423 public static class TestException extends RuntimeException {
424 private static final long serialVersionUID = 1L;