2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.dispatch.Dispatchers;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import java.util.concurrent.TimeUnit;
24 import org.junit.Before;
25 import org.junit.Test;
26 import org.mockito.InOrder;
27 import org.mockito.Mockito;
28 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
29 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
31 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
41 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
43 import org.opendaylight.controller.cluster.raft.TestActorFactory;
44 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
46 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
49 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
50 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
52 public class ShardTransactionTest extends AbstractActorTest {
54 private static final TransactionType RO = TransactionType.READ_ONLY;
55 private static final TransactionType RW = TransactionType.READ_WRITE;
56 private static final TransactionType WO = TransactionType.WRITE_ONLY;
58 private static final ShardIdentifier SHARD_IDENTIFIER =
59 ShardIdentifier.create("inventory", MEMBER_NAME, "config");
61 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
63 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
65 private TestActorRef<Shard> shard;
66 private ShardDataTree store;
70 shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
71 schemaContext(TestModel.createTestContext()).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
72 ShardTestKit.waitUntilLeader(shard);
73 store = shard.underlyingActor().getDataStore();
76 private ActorRef newTransactionActor(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction, final String name) {
77 Props props = ShardTransaction.props(type, transaction, shard, datastoreContext, shard.underlyingActor().getShardMBean());
78 return actorFactory.createActor(props, name);
81 private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
82 return store.newReadOnlyTransaction(nextTransactionId());
85 private ReadWriteShardDataTreeTransaction readWriteTransaction() {
86 return store.newReadWriteTransaction(nextTransactionId());
90 public void testOnReceiveReadData() throws Exception {
91 new JavaTestKit(getSystem()) {{
92 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
94 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
97 private void testOnReceiveReadData(final ActorRef transaction) {
98 transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY,
99 DataStoreVersions.CURRENT_VERSION), getRef());
101 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
103 assertNotNull(reply.getNormalizedNode());
108 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
109 new JavaTestKit(getSystem()) {{
110 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
111 RO, readOnlyTransaction(), "testReadDataWhenDataNotFoundRO"));
113 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
114 RW, readWriteTransaction(), "testReadDataWhenDataNotFoundRW"));
117 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
118 transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
120 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
122 assertTrue(reply.getNormalizedNode() == null);
127 public void testOnReceiveDataExistsPositive() throws Exception {
128 new JavaTestKit(getSystem()) {{
129 testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(),
130 "testDataExistsPositiveRO"));
132 testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(),
133 "testDataExistsPositiveRW"));
136 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
137 transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY,
138 DataStoreVersions.CURRENT_VERSION), getRef());
140 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
142 assertTrue(reply.exists());
147 public void testOnReceiveDataExistsNegative() throws Exception {
148 new JavaTestKit(getSystem()) {{
149 testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(),
150 "testDataExistsNegativeRO"));
152 testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(),
153 "testDataExistsNegativeRW"));
156 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
157 transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
159 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
161 assertFalse(reply.exists());
166 public void testOnReceiveBatchedModifications() throws Exception {
167 new JavaTestKit(getSystem()) {{
169 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
170 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
171 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
172 nextTransactionId(), mockModification);
173 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
175 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
176 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
177 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
178 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
180 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
181 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
182 new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
184 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
186 BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
187 batched.addModification(new WriteModification(writePath, writeData));
188 batched.addModification(new MergeModification(mergePath, mergeData));
189 batched.addModification(new DeleteModification(deletePath));
191 transaction.tell(batched, getRef());
193 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
194 assertEquals("getNumBatched", 3, reply.getNumBatched());
196 InOrder inOrder = Mockito.inOrder(mockModification);
197 inOrder.verify(mockModification).write(writePath, writeData);
198 inOrder.verify(mockModification).merge(mergePath, mergeData);
199 inOrder.verify(mockModification).delete(deletePath);
204 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
205 new JavaTestKit(getSystem()) {{
207 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
208 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
210 JavaTestKit watcher = new JavaTestKit(getSystem());
211 watcher.watch(transaction);
213 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
214 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
215 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
216 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
218 final TransactionIdentifier tx1 = nextTransactionId();
219 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
220 batched.addModification(new WriteModification(writePath, writeData));
222 transaction.tell(batched, getRef());
223 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
224 assertEquals("getNumBatched", 1, reply.getNumBatched());
226 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
227 batched.setReady(true);
228 batched.setTotalMessagesSent(2);
230 transaction.tell(batched, getRef());
231 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
232 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
237 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
238 new JavaTestKit(getSystem()) {{
240 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
241 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
243 JavaTestKit watcher = new JavaTestKit(getSystem());
244 watcher.watch(transaction);
246 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
247 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
248 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
249 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
251 BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
252 batched.addModification(new WriteModification(writePath, writeData));
253 batched.setReady(true);
254 batched.setDoCommitOnReady(true);
255 batched.setTotalMessagesSent(1);
257 transaction.tell(batched, getRef());
258 expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
259 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
263 @Test(expected=TestException.class)
264 public void testOnReceiveBatchedModificationsFailure() throws Throwable {
265 new JavaTestKit(getSystem()) {{
267 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
268 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
269 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
270 nextTransactionId(), mockModification);
271 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
272 "testOnReceiveBatchedModificationsFailure");
274 JavaTestKit watcher = new JavaTestKit(getSystem());
275 watcher.watch(transaction);
277 YangInstanceIdentifier path = TestModel.TEST_PATH;
278 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
280 doThrow(new TestException()).when(mockModification).write(path, node);
282 final TransactionIdentifier tx1 = nextTransactionId();
283 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
284 batched.addModification(new WriteModification(path, node));
286 transaction.tell(batched, getRef());
287 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
289 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
290 batched.setReady(true);
291 batched.setTotalMessagesSent(2);
293 transaction.tell(batched, getRef());
294 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
295 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
297 if(failure != null) {
298 throw failure.cause();
303 @Test(expected=IllegalStateException.class)
304 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
305 new JavaTestKit(getSystem()) {{
307 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
308 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
310 JavaTestKit watcher = new JavaTestKit(getSystem());
311 watcher.watch(transaction);
313 BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
314 batched.setReady(true);
315 batched.setTotalMessagesSent(2);
317 transaction.tell(batched, getRef());
319 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
320 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
322 if(failure != null) {
323 throw failure.cause();
329 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
330 new JavaTestKit(getSystem()) {{
331 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
332 "testReadWriteTxOnReceiveCloseTransaction");
336 transaction.tell(new CloseTransaction().toSerializable(), getRef());
338 expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
339 expectTerminated(duration("3 seconds"), transaction);
344 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
345 new JavaTestKit(getSystem()) {{
346 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
347 "testWriteTxOnReceiveCloseTransaction");
351 transaction.tell(new CloseTransaction().toSerializable(), getRef());
353 expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
354 expectTerminated(duration("3 seconds"), transaction);
359 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
360 new JavaTestKit(getSystem()) {{
361 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
362 "testReadOnlyTxOnReceiveCloseTransaction");
366 transaction.tell(new CloseTransaction().toSerializable(), getRef());
368 expectMsgClass(duration("3 seconds"), Terminated.class);
373 public void testShardTransactionInactivity() {
375 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
376 500, TimeUnit.MILLISECONDS).build();
378 new JavaTestKit(getSystem()) {{
379 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
380 "testShardTransactionInactivity");
384 expectMsgClass(duration("3 seconds"), Terminated.class);
387 public static class TestException extends RuntimeException {
388 private static final long serialVersionUID = 1L;