2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.mockito.InOrder;
25 import org.mockito.Mockito;
26 import org.opendaylight.controller.cluster.access.concepts.MemberName;
27 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
28 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
29 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
31 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
37 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
42 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
43 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
44 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
45 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
46 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
47 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
52 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
53 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
54 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
55 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
57 public class ShardTransactionTest extends AbstractActorTest {
59 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
60 private static final TransactionType RO = TransactionType.READ_ONLY;
61 private static final TransactionType RW = TransactionType.READ_WRITE;
62 private static final TransactionType WO = TransactionType.WRITE_ONLY;
64 private static final ShardIdentifier SHARD_IDENTIFIER =
65 ShardIdentifier.builder().memberName(MemberName.forName("member-1"))
66 .shardName("inventory").type("config").build();
68 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
70 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
72 private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
74 private int txCounter = 0;
76 private ActorRef createShard() {
77 ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
78 schemaContext(TestModel.createTestContext()).props());
79 ShardTestKit.waitUntilLeader(shard);
83 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
84 return newTransactionActor(type, transaction, null, name);
87 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
88 Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
89 datastoreContext, shardStats, "txn");
90 return getSystem().actorOf(props, name);
93 private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
94 return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
97 private ReadWriteShardDataTreeTransaction readWriteTransaction() {
98 return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
102 public void testOnReceiveReadData() throws Exception {
103 new JavaTestKit(getSystem()) {{
104 final ActorRef shard = createShard();
106 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
108 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
111 private void testOnReceiveReadData(final ActorRef transaction) {
112 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build(),
113 DataStoreVersions.CURRENT_VERSION),getRef());
115 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
117 assertNotNull(reply.getNormalizedNode());
122 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
123 new JavaTestKit(getSystem()) {{
124 final ActorRef shard = createShard();
126 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
127 RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
129 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
130 RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
133 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
134 transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
136 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
138 assertTrue(reply.getNormalizedNode() == null);
143 public void testOnReceiveDataExistsPositive() throws Exception {
144 new JavaTestKit(getSystem()) {{
145 final ActorRef shard = createShard();
147 testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
148 "testDataExistsPositiveRO"));
150 testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
151 "testDataExistsPositiveRW"));
154 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
155 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build(),
156 DataStoreVersions.CURRENT_VERSION),getRef());
158 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
160 assertTrue(reply.exists());
165 public void testOnReceiveDataExistsNegative() throws Exception {
166 new JavaTestKit(getSystem()) {{
167 final ActorRef shard = createShard();
169 testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
170 "testDataExistsNegativeRO"));
172 testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
173 "testDataExistsNegativeRW"));
176 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
177 transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
179 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
181 assertFalse(reply.exists());
186 public void testOnReceiveBatchedModifications() throws Exception {
187 new JavaTestKit(getSystem()) {{
189 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
190 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
191 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
192 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
194 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
195 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
196 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
197 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
199 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
200 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
201 new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
203 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
205 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
206 batched.addModification(new WriteModification(writePath, writeData));
207 batched.addModification(new MergeModification(mergePath, mergeData));
208 batched.addModification(new DeleteModification(deletePath));
210 transaction.tell(batched, getRef());
212 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
213 assertEquals("getNumBatched", 3, reply.getNumBatched());
215 InOrder inOrder = Mockito.inOrder(mockModification);
216 inOrder.verify(mockModification).write(writePath, writeData);
217 inOrder.verify(mockModification).merge(mergePath, mergeData);
218 inOrder.verify(mockModification).delete(deletePath);
223 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
224 new JavaTestKit(getSystem()) {{
226 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
227 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
229 JavaTestKit watcher = new JavaTestKit(getSystem());
230 watcher.watch(transaction);
232 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
233 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
234 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
235 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
237 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
238 batched.addModification(new WriteModification(writePath, writeData));
240 transaction.tell(batched, getRef());
241 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
242 assertEquals("getNumBatched", 1, reply.getNumBatched());
244 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
245 batched.setReady(true);
246 batched.setTotalMessagesSent(2);
248 transaction.tell(batched, getRef());
249 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
250 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
255 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
256 new JavaTestKit(getSystem()) {{
258 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
259 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
261 JavaTestKit watcher = new JavaTestKit(getSystem());
262 watcher.watch(transaction);
264 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
265 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
266 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
267 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
269 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
270 batched.addModification(new WriteModification(writePath, writeData));
271 batched.setReady(true);
272 batched.setDoCommitOnReady(true);
273 batched.setTotalMessagesSent(1);
275 transaction.tell(batched, getRef());
276 expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
277 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
281 @Test(expected=TestException.class)
282 public void testOnReceiveBatchedModificationsFailure() throws Throwable {
283 new JavaTestKit(getSystem()) {{
285 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
286 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
287 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
288 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
289 "testOnReceiveBatchedModificationsFailure");
291 JavaTestKit watcher = new JavaTestKit(getSystem());
292 watcher.watch(transaction);
294 YangInstanceIdentifier path = TestModel.TEST_PATH;
295 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
297 doThrow(new TestException()).when(mockModification).write(path, node);
299 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
300 batched.addModification(new WriteModification(path, node));
302 transaction.tell(batched, getRef());
303 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
305 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
306 batched.setReady(true);
307 batched.setTotalMessagesSent(2);
309 transaction.tell(batched, getRef());
310 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
311 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
313 if(failure != null) {
314 throw failure.cause();
319 @Test(expected=IllegalStateException.class)
320 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
321 new JavaTestKit(getSystem()) {{
323 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
324 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
326 JavaTestKit watcher = new JavaTestKit(getSystem());
327 watcher.watch(transaction);
329 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
330 batched.setReady(true);
331 batched.setTotalMessagesSent(2);
333 transaction.tell(batched, getRef());
335 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
336 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
338 if(failure != null) {
339 throw failure.cause();
345 public void testOnReceiveCreateSnapshot() throws Exception {
346 new JavaTestKit(getSystem()) {{
347 ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
348 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
350 NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
351 YangInstanceIdentifier.builder().build());
353 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
354 "testOnReceiveCreateSnapshot");
358 transaction.tell(CreateSnapshot.INSTANCE, getRef());
360 CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
362 assertNotNull("getSnapshot is null", reply.getSnapshot());
364 NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
365 reply.getSnapshot());
367 assertEquals("Root node", expectedRoot, actualRoot);
369 expectTerminated(duration("3 seconds"), transaction);
374 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
375 new JavaTestKit(getSystem()) {{
376 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
377 "testReadWriteTxOnReceiveCloseTransaction");
381 transaction.tell(new CloseTransaction().toSerializable(), getRef());
383 expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
384 expectTerminated(duration("3 seconds"), transaction);
389 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
390 new JavaTestKit(getSystem()) {{
391 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
392 "testWriteTxOnReceiveCloseTransaction");
396 transaction.tell(new CloseTransaction().toSerializable(), getRef());
398 expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
399 expectTerminated(duration("3 seconds"), transaction);
404 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
405 new JavaTestKit(getSystem()) {{
406 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
407 "testReadOnlyTxOnReceiveCloseTransaction");
411 transaction.tell(new CloseTransaction().toSerializable(), getRef());
413 expectMsgClass(duration("3 seconds"), Terminated.class);
417 @Test(expected=UnknownMessageException.class)
418 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
419 final ActorRef shard = createShard();
420 final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
421 datastoreContext, shardStats, "txn");
422 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
424 transaction.receive(new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null),
425 ActorRef.noSender());
429 public void testShardTransactionInactivity() {
431 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
432 500, TimeUnit.MILLISECONDS).build();
434 new JavaTestKit(getSystem()) {{
435 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
436 "testShardTransactionInactivity");
440 expectMsgClass(duration("3 seconds"), Terminated.class);
445 public void testOnReceivePreBoronReadData() throws Exception {
446 new JavaTestKit(getSystem()) {{
447 ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), createShard(),
448 "testOnReceivePreBoronReadData");
450 transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.LITHIUM_VERSION).
451 toSerializable(), getRef());
453 Object replySerialized = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
454 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
459 public void testOnReceivePreBoronDataExists() throws Exception {
460 new JavaTestKit(getSystem()) {{
461 ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), createShard(),
462 "testOnReceivePreBoronDataExists");
464 transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.LITHIUM_VERSION).
465 toSerializable(), getRef());
467 Object replySerialized = expectMsgClass(duration("5 seconds"),
468 ShardTransactionMessages.DataExistsReply.class);
469 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
473 public static class TestException extends RuntimeException {
474 private static final long serialVersionUID = 1L;