1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import static org.mockito.Mockito.doThrow;
8 import akka.actor.ActorRef;
9 import akka.actor.Props;
10 import akka.actor.Status.Failure;
11 import akka.actor.Terminated;
12 import akka.testkit.JavaTestKit;
13 import akka.testkit.TestActorRef;
14 import java.util.Collections;
15 import java.util.concurrent.TimeUnit;
16 import org.junit.Test;
17 import org.mockito.InOrder;
18 import org.mockito.Mockito;
19 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
20 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
21 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
22 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
23 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
31 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
32 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
33 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
34 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
35 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
41 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
42 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
43 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
44 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
45 import org.opendaylight.controller.cluster.datastore.modification.Modification;
46 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
47 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
48 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
49 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
50 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
51 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
52 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
54 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
55 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
56 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
57 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
58 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
59 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
61 public class ShardTransactionTest extends AbstractActorTest {
63 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
64 private static final TransactionType RO = TransactionType.READ_ONLY;
65 private static final TransactionType RW = TransactionType.READ_WRITE;
66 private static final TransactionType WO = TransactionType.WRITE_ONLY;
68 private static final ShardIdentifier SHARD_IDENTIFIER =
69 ShardIdentifier.builder().memberName("member-1")
70 .shardName("inventory").type("config").build();
72 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
74 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
76 private final ShardDataTree store = new ShardDataTree(testSchemaContext);
78 private int txCounter = 0;
80 private ActorRef createShard() {
81 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
82 Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
85 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
86 return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
89 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
90 return newTransactionActor(type, transaction, null, name, version);
93 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
94 return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
97 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
99 Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
100 datastoreContext, shardStats, "txn", version);
101 return getSystem().actorOf(props, name);
104 private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
105 return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
108 private ReadWriteShardDataTreeTransaction readWriteTransaction() {
109 return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
113 public void testOnReceiveReadData() throws Exception {
114 new JavaTestKit(getSystem()) {{
115 final ActorRef shard = createShard();
117 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
119 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
122 private void testOnReceiveReadData(final ActorRef transaction) {
124 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
127 Object replySerialized =
128 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
130 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
133 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
135 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
137 assertNotNull(reply.getNormalizedNode());
142 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
143 new JavaTestKit(getSystem()) {{
144 final ActorRef shard = createShard();
146 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
147 RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
149 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
150 RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
153 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
155 transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
157 Object replySerialized =
158 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
160 assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
163 transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
165 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
167 assertTrue(reply.getNormalizedNode() == null);
172 public void testOnReceiveReadDataHeliumR1() throws Exception {
173 new JavaTestKit(getSystem()) {{
174 ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(),
175 "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
177 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
180 ShardTransactionMessages.ReadDataReply replySerialized =
181 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
183 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
188 public void testOnReceiveDataExistsPositive() throws Exception {
189 new JavaTestKit(getSystem()) {{
190 final ActorRef shard = createShard();
192 testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
193 "testDataExistsPositiveRO"));
195 testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
196 "testDataExistsPositiveRW"));
199 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
200 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
203 ShardTransactionMessages.DataExistsReply replySerialized =
204 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
206 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
209 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
211 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
213 assertTrue(reply.exists());
218 public void testOnReceiveDataExistsNegative() throws Exception {
219 new JavaTestKit(getSystem()) {{
220 final ActorRef shard = createShard();
222 testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
223 "testDataExistsNegativeRO"));
225 testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
226 "testDataExistsNegativeRW"));
229 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
230 transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
232 ShardTransactionMessages.DataExistsReply replySerialized =
233 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
235 assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
238 transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
240 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
242 assertFalse(reply.exists());
246 private void assertModification(final ActorRef subject,
247 final Class<? extends Modification> modificationType) {
248 new JavaTestKit(getSystem()) {{
249 subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
251 CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
252 GetCompositeModificationReply.class).getModification();
254 assertTrue(compositeModification.getModifications().size() == 1);
255 assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
260 public void testOnReceiveWriteData() {
261 new JavaTestKit(getSystem()) {{
262 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
263 "testOnReceiveWriteData");
265 transaction.tell(new WriteData(TestModel.TEST_PATH,
266 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
267 toSerializable(), getRef());
269 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
271 assertModification(transaction, WriteModification.class);
273 // unserialized write
274 transaction.tell(new WriteData(TestModel.TEST_PATH,
275 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
278 expectMsgClass(duration("5 seconds"), WriteDataReply.class);
283 public void testOnReceiveHeliumR1WriteData() {
284 new JavaTestKit(getSystem()) {{
285 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
286 "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
288 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
289 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
290 ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
291 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
292 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
294 transaction.tell(serialized, getRef());
296 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
298 assertModification(transaction, WriteModification.class);
303 public void testOnReceiveMergeData() {
304 new JavaTestKit(getSystem()) {{
305 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
308 transaction.tell(new MergeData(TestModel.TEST_PATH,
309 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
310 toSerializable(), getRef());
312 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
314 assertModification(transaction, MergeModification.class);
317 transaction.tell(new MergeData(TestModel.TEST_PATH,
318 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
321 expectMsgClass(duration("5 seconds"), MergeDataReply.class);
326 public void testOnReceiveHeliumR1MergeData() throws Exception {
327 new JavaTestKit(getSystem()) {{
328 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
329 "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
331 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
332 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
333 ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
334 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
335 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
337 transaction.tell(serialized, getRef());
339 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
341 assertModification(transaction, MergeModification.class);
346 public void testOnReceiveDeleteData() throws Exception {
347 new JavaTestKit(getSystem()) {{
348 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
351 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
352 toSerializable(), getRef());
354 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
356 assertModification(transaction, DeleteModification.class);
359 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
361 expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
366 public void testOnReceiveBatchedModifications() throws Exception {
367 new JavaTestKit(getSystem()) {{
369 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
370 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
371 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
372 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
374 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
375 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
376 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
377 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
379 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
380 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
381 new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
383 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
385 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
386 batched.addModification(new WriteModification(writePath, writeData));
387 batched.addModification(new MergeModification(mergePath, mergeData));
388 batched.addModification(new DeleteModification(deletePath));
390 transaction.tell(batched, getRef());
392 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
393 assertEquals("getNumBatched", 3, reply.getNumBatched());
395 JavaTestKit verification = new JavaTestKit(getSystem());
396 transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
398 CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
399 GetCompositeModificationReply.class).getModification();
401 assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
403 WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
404 assertEquals("getPath", writePath, write.getPath());
405 assertEquals("getData", writeData, write.getData());
407 MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
408 assertEquals("getPath", mergePath, merge.getPath());
409 assertEquals("getData", mergeData, merge.getData());
411 DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
412 assertEquals("getPath", deletePath, delete.getPath());
414 InOrder inOrder = Mockito.inOrder(mockModification);
415 inOrder.verify(mockModification).write(writePath, writeData);
416 inOrder.verify(mockModification).merge(mergePath, mergeData);
417 inOrder.verify(mockModification).delete(deletePath);
422 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
423 new JavaTestKit(getSystem()) {{
425 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
426 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
428 JavaTestKit watcher = new JavaTestKit(getSystem());
429 watcher.watch(transaction);
431 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
432 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
433 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
434 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
436 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
437 batched.addModification(new WriteModification(writePath, writeData));
439 transaction.tell(batched, getRef());
440 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
441 assertEquals("getNumBatched", 1, reply.getNumBatched());
443 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
444 batched.setReady(true);
445 batched.setTotalMessagesSent(2);
447 transaction.tell(batched, getRef());
448 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
449 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
454 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
455 new JavaTestKit(getSystem()) {{
457 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
458 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
460 JavaTestKit watcher = new JavaTestKit(getSystem());
461 watcher.watch(transaction);
463 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
464 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
465 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
466 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
468 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
469 batched.addModification(new WriteModification(writePath, writeData));
470 batched.setReady(true);
471 batched.setDoCommitOnReady(true);
472 batched.setTotalMessagesSent(1);
474 transaction.tell(batched, getRef());
475 expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
476 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
480 @Test(expected=TestException.class)
481 public void testOnReceiveBatchedModificationsFailure() throws Throwable {
482 new JavaTestKit(getSystem()) {{
484 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
485 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
486 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
487 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
488 "testOnReceiveBatchedModificationsFailure");
490 JavaTestKit watcher = new JavaTestKit(getSystem());
491 watcher.watch(transaction);
493 YangInstanceIdentifier path = TestModel.TEST_PATH;
494 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
496 doThrow(new TestException()).when(mockModification).write(path, node);
498 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
499 batched.addModification(new WriteModification(path, node));
501 transaction.tell(batched, getRef());
502 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
504 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
505 batched.setReady(true);
506 batched.setTotalMessagesSent(2);
508 transaction.tell(batched, getRef());
509 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
510 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
512 if(failure != null) {
513 throw failure.cause();
518 @Test(expected=IllegalStateException.class)
519 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
520 new JavaTestKit(getSystem()) {{
522 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
523 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
525 JavaTestKit watcher = new JavaTestKit(getSystem());
526 watcher.watch(transaction);
528 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
529 batched.setReady(true);
530 batched.setTotalMessagesSent(2);
532 transaction.tell(batched, getRef());
534 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
535 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
537 if(failure != null) {
538 throw failure.cause();
544 public void testOnReceivePreLithiumReadyTransaction() throws Exception {
545 new JavaTestKit(getSystem()) {{
546 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
547 "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
549 JavaTestKit watcher = new JavaTestKit(getSystem());
550 watcher.watch(transaction);
552 transaction.tell(new ReadyTransaction().toSerializable(), getRef());
554 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
555 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
559 new JavaTestKit(getSystem()) {{
560 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
561 "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
563 JavaTestKit watcher = new JavaTestKit(getSystem());
564 watcher.watch(transaction);
566 transaction.tell(new ReadyTransaction(), getRef());
568 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
569 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
574 public void testOnReceiveCreateSnapshot() throws Exception {
575 new JavaTestKit(getSystem()) {{
576 ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
577 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
579 NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
580 YangInstanceIdentifier.builder().build());
582 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
583 "testOnReceiveCreateSnapshot");
587 transaction.tell(CreateSnapshot.INSTANCE, getRef());
589 CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
591 assertNotNull("getSnapshot is null", reply.getSnapshot());
593 NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
594 reply.getSnapshot());
596 assertEquals("Root node", expectedRoot, actualRoot);
598 expectTerminated(duration("3 seconds"), transaction);
603 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
604 new JavaTestKit(getSystem()) {{
605 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
606 "testReadWriteTxOnReceiveCloseTransaction");
610 transaction.tell(new CloseTransaction().toSerializable(), getRef());
612 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
613 expectTerminated(duration("3 seconds"), transaction);
618 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
619 new JavaTestKit(getSystem()) {{
620 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
621 "testWriteTxOnReceiveCloseTransaction");
625 transaction.tell(new CloseTransaction().toSerializable(), getRef());
627 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
628 expectTerminated(duration("3 seconds"), transaction);
633 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
634 new JavaTestKit(getSystem()) {{
635 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
636 "testReadOnlyTxOnReceiveCloseTransaction");
640 transaction.tell(new CloseTransaction().toSerializable(), getRef());
642 expectMsgClass(duration("3 seconds"), Terminated.class);
646 @Test(expected=UnknownMessageException.class)
647 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
648 final ActorRef shard = createShard();
649 final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
650 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
651 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
653 transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
654 toSerializable(), ActorRef.noSender());
658 public void testShardTransactionInactivity() {
660 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
661 500, TimeUnit.MILLISECONDS).build();
663 new JavaTestKit(getSystem()) {{
664 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
665 "testShardTransactionInactivity");
669 expectMsgClass(duration("3 seconds"), Terminated.class);
673 public static class TestException extends RuntimeException {
674 private static final long serialVersionUID = 1L;