2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.mockito.InOrder;
25 import org.mockito.Mockito;
26 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
27 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
28 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
29 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
31 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
37 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
38 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
39 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
40 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
41 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
44 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
45 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
46 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
47 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
48 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
49 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
50 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
51 import org.opendaylight.controller.cluster.datastore.modification.Modification;
52 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
53 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
54 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
55 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
56 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
57 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
58 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
59 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
60 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
61 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
62 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
63 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
64 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
65 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
67 public class ShardTransactionTest extends AbstractActorTest {
69 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
70 private static final TransactionType RO = TransactionType.READ_ONLY;
71 private static final TransactionType RW = TransactionType.READ_WRITE;
72 private static final TransactionType WO = TransactionType.WRITE_ONLY;
74 private static final ShardIdentifier SHARD_IDENTIFIER =
75 ShardIdentifier.builder().memberName("member-1")
76 .shardName("inventory").type("config").build();
78 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
80 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
82 private final ShardDataTree store = new ShardDataTree(testSchemaContext);
84 private int txCounter = 0;
86 private ActorRef createShard() {
87 return getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
88 schemaContext(TestModel.createTestContext()).props());
91 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
92 return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
95 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
96 return newTransactionActor(type, transaction, null, name, version);
99 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
100 return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
103 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
105 Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
106 datastoreContext, shardStats, "txn", version);
107 return getSystem().actorOf(props, name);
110 private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
111 return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
114 private ReadWriteShardDataTreeTransaction readWriteTransaction() {
115 return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
119 public void testOnReceiveReadData() throws Exception {
120 new JavaTestKit(getSystem()) {{
121 final ActorRef shard = createShard();
123 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
125 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
128 private void testOnReceiveReadData(final ActorRef transaction) {
130 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
133 Object replySerialized =
134 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
136 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
139 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
141 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
143 assertNotNull(reply.getNormalizedNode());
148 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
149 new JavaTestKit(getSystem()) {{
150 final ActorRef shard = createShard();
152 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
153 RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
155 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
156 RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
159 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
161 transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
163 Object replySerialized =
164 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
166 assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
169 transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
171 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
173 assertTrue(reply.getNormalizedNode() == null);
178 public void testOnReceiveReadDataHeliumR1() throws Exception {
179 new JavaTestKit(getSystem()) {{
180 ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(),
181 "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
183 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
186 ShardTransactionMessages.ReadDataReply replySerialized =
187 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
189 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
194 public void testOnReceiveDataExistsPositive() throws Exception {
195 new JavaTestKit(getSystem()) {{
196 final ActorRef shard = createShard();
198 testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
199 "testDataExistsPositiveRO"));
201 testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
202 "testDataExistsPositiveRW"));
205 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
206 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
209 ShardTransactionMessages.DataExistsReply replySerialized =
210 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
212 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
215 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
217 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
219 assertTrue(reply.exists());
224 public void testOnReceiveDataExistsNegative() throws Exception {
225 new JavaTestKit(getSystem()) {{
226 final ActorRef shard = createShard();
228 testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
229 "testDataExistsNegativeRO"));
231 testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
232 "testDataExistsNegativeRW"));
235 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
236 transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
238 ShardTransactionMessages.DataExistsReply replySerialized =
239 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
241 assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
244 transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
246 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
248 assertFalse(reply.exists());
252 private void assertModification(final ActorRef subject,
253 final Class<? extends Modification> modificationType) {
254 new JavaTestKit(getSystem()) {{
255 subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
257 CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
258 GetCompositeModificationReply.class).getModification();
260 assertTrue(compositeModification.getModifications().size() == 1);
261 assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
266 public void testOnReceiveWriteData() {
267 new JavaTestKit(getSystem()) {{
268 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
269 "testOnReceiveWriteData");
271 transaction.tell(new WriteData(TestModel.TEST_PATH,
272 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
273 toSerializable(), getRef());
275 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
277 assertModification(transaction, WriteModification.class);
279 // unserialized write
280 transaction.tell(new WriteData(TestModel.TEST_PATH,
281 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
284 expectMsgClass(duration("5 seconds"), WriteDataReply.class);
289 public void testOnReceiveHeliumR1WriteData() {
290 new JavaTestKit(getSystem()) {{
291 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
292 "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
294 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
295 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
296 ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
297 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
298 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
300 transaction.tell(serialized, getRef());
302 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
304 assertModification(transaction, WriteModification.class);
309 public void testOnReceiveMergeData() {
310 new JavaTestKit(getSystem()) {{
311 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
314 transaction.tell(new MergeData(TestModel.TEST_PATH,
315 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
316 toSerializable(), getRef());
318 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
320 assertModification(transaction, MergeModification.class);
323 transaction.tell(new MergeData(TestModel.TEST_PATH,
324 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
327 expectMsgClass(duration("5 seconds"), MergeDataReply.class);
332 public void testOnReceiveHeliumR1MergeData() throws Exception {
333 new JavaTestKit(getSystem()) {{
334 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
335 "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
337 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
338 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
339 ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
340 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
341 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
343 transaction.tell(serialized, getRef());
345 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
347 assertModification(transaction, MergeModification.class);
352 public void testOnReceiveDeleteData() throws Exception {
353 new JavaTestKit(getSystem()) {{
354 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
357 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
358 toSerializable(), getRef());
360 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
362 assertModification(transaction, DeleteModification.class);
365 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
367 expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
372 public void testOnReceiveBatchedModifications() throws Exception {
373 new JavaTestKit(getSystem()) {{
375 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
376 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
377 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
378 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
380 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
381 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
382 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
383 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
385 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
386 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
387 new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
389 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
391 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
392 batched.addModification(new WriteModification(writePath, writeData));
393 batched.addModification(new MergeModification(mergePath, mergeData));
394 batched.addModification(new DeleteModification(deletePath));
396 transaction.tell(batched, getRef());
398 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
399 assertEquals("getNumBatched", 3, reply.getNumBatched());
401 JavaTestKit verification = new JavaTestKit(getSystem());
402 transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
404 CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
405 GetCompositeModificationReply.class).getModification();
407 assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
409 WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
410 assertEquals("getPath", writePath, write.getPath());
411 assertEquals("getData", writeData, write.getData());
413 MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
414 assertEquals("getPath", mergePath, merge.getPath());
415 assertEquals("getData", mergeData, merge.getData());
417 DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
418 assertEquals("getPath", deletePath, delete.getPath());
420 InOrder inOrder = Mockito.inOrder(mockModification);
421 inOrder.verify(mockModification).write(writePath, writeData);
422 inOrder.verify(mockModification).merge(mergePath, mergeData);
423 inOrder.verify(mockModification).delete(deletePath);
428 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
429 new JavaTestKit(getSystem()) {{
431 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
432 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
434 JavaTestKit watcher = new JavaTestKit(getSystem());
435 watcher.watch(transaction);
437 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
438 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
439 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
440 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
442 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
443 batched.addModification(new WriteModification(writePath, writeData));
445 transaction.tell(batched, getRef());
446 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
447 assertEquals("getNumBatched", 1, reply.getNumBatched());
449 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
450 batched.setReady(true);
451 batched.setTotalMessagesSent(2);
453 transaction.tell(batched, getRef());
454 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
455 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
460 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
461 new JavaTestKit(getSystem()) {{
463 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
464 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
466 JavaTestKit watcher = new JavaTestKit(getSystem());
467 watcher.watch(transaction);
469 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
470 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
471 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
472 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
474 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
475 batched.addModification(new WriteModification(writePath, writeData));
476 batched.setReady(true);
477 batched.setDoCommitOnReady(true);
478 batched.setTotalMessagesSent(1);
480 transaction.tell(batched, getRef());
481 expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
482 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
486 @Test(expected=TestException.class)
487 public void testOnReceiveBatchedModificationsFailure() throws Throwable {
488 new JavaTestKit(getSystem()) {{
490 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
491 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
492 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
493 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
494 "testOnReceiveBatchedModificationsFailure");
496 JavaTestKit watcher = new JavaTestKit(getSystem());
497 watcher.watch(transaction);
499 YangInstanceIdentifier path = TestModel.TEST_PATH;
500 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
502 doThrow(new TestException()).when(mockModification).write(path, node);
504 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
505 batched.addModification(new WriteModification(path, node));
507 transaction.tell(batched, getRef());
508 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
510 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
511 batched.setReady(true);
512 batched.setTotalMessagesSent(2);
514 transaction.tell(batched, getRef());
515 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
516 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
518 if(failure != null) {
519 throw failure.cause();
524 @Test(expected=IllegalStateException.class)
525 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
526 new JavaTestKit(getSystem()) {{
528 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
529 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
531 JavaTestKit watcher = new JavaTestKit(getSystem());
532 watcher.watch(transaction);
534 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
535 batched.setReady(true);
536 batched.setTotalMessagesSent(2);
538 transaction.tell(batched, getRef());
540 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
541 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
543 if(failure != null) {
544 throw failure.cause();
550 public void testOnReceivePreLithiumReadyTransaction() throws Exception {
551 new JavaTestKit(getSystem()) {{
552 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
553 "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
555 JavaTestKit watcher = new JavaTestKit(getSystem());
556 watcher.watch(transaction);
558 transaction.tell(new ReadyTransaction().toSerializable(), getRef());
560 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
561 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
565 new JavaTestKit(getSystem()) {{
566 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
567 "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
569 JavaTestKit watcher = new JavaTestKit(getSystem());
570 watcher.watch(transaction);
572 transaction.tell(new ReadyTransaction(), getRef());
574 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
575 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
580 public void testOnReceiveCreateSnapshot() throws Exception {
581 new JavaTestKit(getSystem()) {{
582 ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
583 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
585 NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
586 YangInstanceIdentifier.builder().build());
588 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
589 "testOnReceiveCreateSnapshot");
593 transaction.tell(CreateSnapshot.INSTANCE, getRef());
595 CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
597 assertNotNull("getSnapshot is null", reply.getSnapshot());
599 NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
600 reply.getSnapshot());
602 assertEquals("Root node", expectedRoot, actualRoot);
604 expectTerminated(duration("3 seconds"), transaction);
609 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
610 new JavaTestKit(getSystem()) {{
611 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
612 "testReadWriteTxOnReceiveCloseTransaction");
616 transaction.tell(new CloseTransaction().toSerializable(), getRef());
618 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
619 expectTerminated(duration("3 seconds"), transaction);
624 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
625 new JavaTestKit(getSystem()) {{
626 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
627 "testWriteTxOnReceiveCloseTransaction");
631 transaction.tell(new CloseTransaction().toSerializable(), getRef());
633 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
634 expectTerminated(duration("3 seconds"), transaction);
639 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
640 new JavaTestKit(getSystem()) {{
641 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
642 "testReadOnlyTxOnReceiveCloseTransaction");
646 transaction.tell(new CloseTransaction().toSerializable(), getRef());
648 expectMsgClass(duration("3 seconds"), Terminated.class);
652 @Test(expected=UnknownMessageException.class)
653 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
654 final ActorRef shard = createShard();
655 final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
656 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
657 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
659 transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
660 toSerializable(), ActorRef.noSender());
664 public void testShardTransactionInactivity() {
666 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
667 500, TimeUnit.MILLISECONDS).build();
669 new JavaTestKit(getSystem()) {{
670 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
671 "testShardTransactionInactivity");
675 expectMsgClass(duration("3 seconds"), Terminated.class);
679 public static class TestException extends RuntimeException {
680 private static final long serialVersionUID = 1L;