1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import static org.mockito.Mockito.doThrow;
8 import akka.actor.ActorRef;
9 import akka.actor.Props;
10 import akka.actor.Status.Failure;
11 import akka.actor.Terminated;
12 import akka.testkit.JavaTestKit;
13 import akka.testkit.TestActorRef;
14 import java.util.Collections;
15 import java.util.concurrent.TimeUnit;
16 import org.junit.Test;
17 import org.mockito.InOrder;
18 import org.mockito.Mockito;
19 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
20 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
21 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
22 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
23 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
32 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
33 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
34 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
40 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
41 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
43 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
44 import org.opendaylight.controller.cluster.datastore.modification.Modification;
45 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
46 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
47 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
48 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
49 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
50 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
51 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
54 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
55 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
56 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
57 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
58 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
60 public class ShardTransactionTest extends AbstractActorTest {
62 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
63 private static final TransactionType RO = TransactionType.READ_ONLY;
64 private static final TransactionType RW = TransactionType.READ_WRITE;
65 private static final TransactionType WO = TransactionType.WRITE_ONLY;
67 private static final ShardIdentifier SHARD_IDENTIFIER =
68 ShardIdentifier.builder().memberName("member-1")
69 .shardName("inventory").type("config").build();
71 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
73 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
75 private final ShardDataTree store = new ShardDataTree(testSchemaContext);
77 private int txCounter = 0;
79 private ActorRef createShard() {
80 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
81 Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
84 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
85 return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
88 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
89 return newTransactionActor(type, transaction, null, name, version);
92 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
93 return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
96 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
98 Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
99 datastoreContext, shardStats, "txn", version);
100 return getSystem().actorOf(props, name);
103 private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
104 return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
107 private ReadWriteShardDataTreeTransaction readWriteTransaction() {
108 return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
112 public void testOnReceiveReadData() throws Exception {
113 new JavaTestKit(getSystem()) {{
114 final ActorRef shard = createShard();
116 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
118 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
121 private void testOnReceiveReadData(final ActorRef transaction) {
123 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
126 Object replySerialized =
127 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
129 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
132 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
134 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
136 assertNotNull(reply.getNormalizedNode());
141 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
142 new JavaTestKit(getSystem()) {{
143 final ActorRef shard = createShard();
145 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
146 RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
148 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
149 RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
152 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
154 transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
156 Object replySerialized =
157 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
159 assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
162 transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
164 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
166 assertTrue(reply.getNormalizedNode() == null);
171 public void testOnReceiveReadDataHeliumR1() throws Exception {
172 new JavaTestKit(getSystem()) {{
173 ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(),
174 "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
176 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
179 ShardTransactionMessages.ReadDataReply replySerialized =
180 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
182 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
187 public void testOnReceiveDataExistsPositive() throws Exception {
188 new JavaTestKit(getSystem()) {{
189 final ActorRef shard = createShard();
191 testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
192 "testDataExistsPositiveRO"));
194 testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
195 "testDataExistsPositiveRW"));
198 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
199 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
202 ShardTransactionMessages.DataExistsReply replySerialized =
203 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
205 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
208 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
210 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
212 assertTrue(reply.exists());
217 public void testOnReceiveDataExistsNegative() throws Exception {
218 new JavaTestKit(getSystem()) {{
219 final ActorRef shard = createShard();
221 testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
222 "testDataExistsNegativeRO"));
224 testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
225 "testDataExistsNegativeRW"));
228 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
229 transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
231 ShardTransactionMessages.DataExistsReply replySerialized =
232 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
234 assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
237 transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
239 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
241 assertFalse(reply.exists());
245 private void assertModification(final ActorRef subject,
246 final Class<? extends Modification> modificationType) {
247 new JavaTestKit(getSystem()) {{
248 subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
250 CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
251 GetCompositeModificationReply.class).getModification();
253 assertTrue(compositeModification.getModifications().size() == 1);
254 assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
259 public void testOnReceiveWriteData() {
260 new JavaTestKit(getSystem()) {{
261 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
262 "testOnReceiveWriteData");
264 transaction.tell(new WriteData(TestModel.TEST_PATH,
265 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
266 toSerializable(), getRef());
268 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
270 assertModification(transaction, WriteModification.class);
272 // unserialized write
273 transaction.tell(new WriteData(TestModel.TEST_PATH,
274 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
277 expectMsgClass(duration("5 seconds"), WriteDataReply.class);
282 public void testOnReceiveHeliumR1WriteData() {
283 new JavaTestKit(getSystem()) {{
284 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
285 "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
287 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
288 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
289 ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
290 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
291 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
293 transaction.tell(serialized, getRef());
295 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
297 assertModification(transaction, WriteModification.class);
302 public void testOnReceiveMergeData() {
303 new JavaTestKit(getSystem()) {{
304 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
307 transaction.tell(new MergeData(TestModel.TEST_PATH,
308 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
309 toSerializable(), getRef());
311 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
313 assertModification(transaction, MergeModification.class);
316 transaction.tell(new MergeData(TestModel.TEST_PATH,
317 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
320 expectMsgClass(duration("5 seconds"), MergeDataReply.class);
325 public void testOnReceiveHeliumR1MergeData() throws Exception {
326 new JavaTestKit(getSystem()) {{
327 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
328 "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
330 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
331 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
332 ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
333 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
334 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
336 transaction.tell(serialized, getRef());
338 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
340 assertModification(transaction, MergeModification.class);
345 public void testOnReceiveDeleteData() throws Exception {
346 new JavaTestKit(getSystem()) {{
347 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
350 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
351 toSerializable(), getRef());
353 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
355 assertModification(transaction, DeleteModification.class);
358 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
360 expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
365 public void testOnReceiveBatchedModifications() throws Exception {
366 new JavaTestKit(getSystem()) {{
368 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
369 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
370 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
371 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
373 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
374 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
375 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
376 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
378 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
379 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
380 new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
382 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
384 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
385 batched.addModification(new WriteModification(writePath, writeData));
386 batched.addModification(new MergeModification(mergePath, mergeData));
387 batched.addModification(new DeleteModification(deletePath));
389 transaction.tell(batched, getRef());
391 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
392 assertEquals("getNumBatched", 3, reply.getNumBatched());
394 JavaTestKit verification = new JavaTestKit(getSystem());
395 transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
397 CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
398 GetCompositeModificationReply.class).getModification();
400 assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
402 WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
403 assertEquals("getPath", writePath, write.getPath());
404 assertEquals("getData", writeData, write.getData());
406 MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
407 assertEquals("getPath", mergePath, merge.getPath());
408 assertEquals("getData", mergeData, merge.getData());
410 DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
411 assertEquals("getPath", deletePath, delete.getPath());
413 InOrder inOrder = Mockito.inOrder(mockModification);
414 inOrder.verify(mockModification).write(writePath, writeData);
415 inOrder.verify(mockModification).merge(mergePath, mergeData);
416 inOrder.verify(mockModification).delete(deletePath);
421 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
422 new JavaTestKit(getSystem()) {{
424 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
425 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
427 JavaTestKit watcher = new JavaTestKit(getSystem());
428 watcher.watch(transaction);
430 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
431 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
432 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
433 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
435 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
436 batched.addModification(new WriteModification(writePath, writeData));
438 transaction.tell(batched, getRef());
439 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
440 assertEquals("getNumBatched", 1, reply.getNumBatched());
442 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
443 batched.setReady(true);
444 batched.setTotalMessagesSent(2);
446 transaction.tell(batched, getRef());
447 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
448 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
453 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
454 new JavaTestKit(getSystem()) {{
456 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
457 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
459 JavaTestKit watcher = new JavaTestKit(getSystem());
460 watcher.watch(transaction);
462 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
463 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
464 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
465 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
467 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
468 batched.addModification(new WriteModification(writePath, writeData));
469 batched.setReady(true);
470 batched.setDoCommitOnReady(true);
471 batched.setTotalMessagesSent(1);
473 transaction.tell(batched, getRef());
474 expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
475 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
479 @Test(expected=TestException.class)
480 public void testOnReceiveBatchedModificationsFailure() throws Throwable {
481 new JavaTestKit(getSystem()) {{
483 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
484 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
485 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
486 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
487 "testOnReceiveBatchedModificationsFailure");
489 JavaTestKit watcher = new JavaTestKit(getSystem());
490 watcher.watch(transaction);
492 YangInstanceIdentifier path = TestModel.TEST_PATH;
493 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
495 doThrow(new TestException()).when(mockModification).write(path, node);
497 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
498 batched.addModification(new WriteModification(path, node));
500 transaction.tell(batched, getRef());
501 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
503 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
504 batched.setReady(true);
505 batched.setTotalMessagesSent(2);
507 transaction.tell(batched, getRef());
508 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
509 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
511 if(failure != null) {
512 throw failure.cause();
517 @Test(expected=IllegalStateException.class)
518 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
519 new JavaTestKit(getSystem()) {{
521 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
522 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
524 JavaTestKit watcher = new JavaTestKit(getSystem());
525 watcher.watch(transaction);
527 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
528 batched.setReady(true);
529 batched.setTotalMessagesSent(2);
531 transaction.tell(batched, getRef());
533 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
534 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
536 if(failure != null) {
537 throw failure.cause();
543 public void testOnReceivePreLithiumReadyTransaction() throws Exception {
544 new JavaTestKit(getSystem()) {{
545 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
546 "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
548 JavaTestKit watcher = new JavaTestKit(getSystem());
549 watcher.watch(transaction);
551 transaction.tell(new ReadyTransaction().toSerializable(), getRef());
553 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
554 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
558 new JavaTestKit(getSystem()) {{
559 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
560 "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
562 JavaTestKit watcher = new JavaTestKit(getSystem());
563 watcher.watch(transaction);
565 transaction.tell(new ReadyTransaction(), getRef());
567 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
568 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
573 public void testOnReceiveCreateSnapshot() throws Exception {
574 new JavaTestKit(getSystem()) {{
575 ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
576 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
578 NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
579 YangInstanceIdentifier.builder().build());
581 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
582 "testOnReceiveCreateSnapshot");
586 transaction.tell(CreateSnapshot.INSTANCE, getRef());
588 CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
590 assertNotNull("getSnapshot is null", reply.getSnapshot());
592 NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
593 reply.getSnapshot());
595 assertEquals("Root node", expectedRoot, actualRoot);
597 expectTerminated(duration("3 seconds"), transaction);
602 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
603 new JavaTestKit(getSystem()) {{
604 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
605 "testReadWriteTxOnReceiveCloseTransaction");
609 transaction.tell(new CloseTransaction().toSerializable(), getRef());
611 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
612 expectTerminated(duration("3 seconds"), transaction);
617 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
618 new JavaTestKit(getSystem()) {{
619 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
620 "testWriteTxOnReceiveCloseTransaction");
624 transaction.tell(new CloseTransaction().toSerializable(), getRef());
626 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
627 expectTerminated(duration("3 seconds"), transaction);
632 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
633 new JavaTestKit(getSystem()) {{
634 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
635 "testReadOnlyTxOnReceiveCloseTransaction");
639 transaction.tell(new CloseTransaction().toSerializable(), getRef());
641 expectMsgClass(duration("3 seconds"), Terminated.class);
645 @Test(expected=UnknownMessageException.class)
646 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
647 final ActorRef shard = createShard();
648 final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
649 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
650 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
652 transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
653 toSerializable(), ActorRef.noSender());
657 public void testShardTransactionInactivity() {
659 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
660 500, TimeUnit.MILLISECONDS).build();
662 new JavaTestKit(getSystem()) {{
663 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
664 "testShardTransactionInactivity");
668 expectMsgClass(duration("3 seconds"), Terminated.class);
672 public static class TestException extends RuntimeException {
673 private static final long serialVersionUID = 1L;