2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.Collections;
23 import java.util.concurrent.TimeUnit;
24 import org.junit.Test;
25 import org.mockito.InOrder;
26 import org.mockito.Mockito;
27 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
28 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
29 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
30 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
31 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
32 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
37 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
38 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
39 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
40 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
41 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
42 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
44 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
45 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
48 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
49 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
50 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
51 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
52 import org.opendaylight.controller.cluster.datastore.modification.Modification;
53 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
54 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
55 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
56 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
57 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
58 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
59 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
60 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
61 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
62 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
63 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
64 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
65 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
66 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
68 public class ShardTransactionTest extends AbstractActorTest {
70 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
71 private static final TransactionType RO = TransactionType.READ_ONLY;
72 private static final TransactionType RW = TransactionType.READ_WRITE;
73 private static final TransactionType WO = TransactionType.WRITE_ONLY;
75 private static final ShardIdentifier SHARD_IDENTIFIER =
76 ShardIdentifier.builder().memberName("member-1")
77 .shardName("inventory").type("config").build();
79 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
81 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
83 private final ShardDataTree store = new ShardDataTree(testSchemaContext);
85 private int txCounter = 0;
87 private ActorRef createShard() {
88 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
89 Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
92 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
93 return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
96 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
97 return newTransactionActor(type, transaction, null, name, version);
100 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
101 return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
104 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
106 Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
107 datastoreContext, shardStats, "txn", version);
108 return getSystem().actorOf(props, name);
111 private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
112 return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
115 private ReadWriteShardDataTreeTransaction readWriteTransaction() {
116 return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
120 public void testOnReceiveReadData() throws Exception {
121 new JavaTestKit(getSystem()) {{
122 final ActorRef shard = createShard();
124 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
126 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
129 private void testOnReceiveReadData(final ActorRef transaction) {
131 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
134 Object replySerialized =
135 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
137 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
140 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
142 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
144 assertNotNull(reply.getNormalizedNode());
149 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
150 new JavaTestKit(getSystem()) {{
151 final ActorRef shard = createShard();
153 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
154 RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
156 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
157 RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
160 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
162 transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
164 Object replySerialized =
165 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
167 assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
170 transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
172 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
174 assertTrue(reply.getNormalizedNode() == null);
179 public void testOnReceiveReadDataHeliumR1() throws Exception {
180 new JavaTestKit(getSystem()) {{
181 ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(),
182 "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
184 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
187 ShardTransactionMessages.ReadDataReply replySerialized =
188 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
190 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
195 public void testOnReceiveDataExistsPositive() throws Exception {
196 new JavaTestKit(getSystem()) {{
197 final ActorRef shard = createShard();
199 testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
200 "testDataExistsPositiveRO"));
202 testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
203 "testDataExistsPositiveRW"));
206 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
207 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
210 ShardTransactionMessages.DataExistsReply replySerialized =
211 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
213 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
216 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
218 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
220 assertTrue(reply.exists());
225 public void testOnReceiveDataExistsNegative() throws Exception {
226 new JavaTestKit(getSystem()) {{
227 final ActorRef shard = createShard();
229 testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
230 "testDataExistsNegativeRO"));
232 testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
233 "testDataExistsNegativeRW"));
236 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
237 transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
239 ShardTransactionMessages.DataExistsReply replySerialized =
240 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
242 assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
245 transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
247 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
249 assertFalse(reply.exists());
253 private void assertModification(final ActorRef subject,
254 final Class<? extends Modification> modificationType) {
255 new JavaTestKit(getSystem()) {{
256 subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
258 CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
259 GetCompositeModificationReply.class).getModification();
261 assertTrue(compositeModification.getModifications().size() == 1);
262 assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
267 public void testOnReceiveWriteData() {
268 new JavaTestKit(getSystem()) {{
269 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
270 "testOnReceiveWriteData");
272 transaction.tell(new WriteData(TestModel.TEST_PATH,
273 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
274 toSerializable(), getRef());
276 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
278 assertModification(transaction, WriteModification.class);
280 // unserialized write
281 transaction.tell(new WriteData(TestModel.TEST_PATH,
282 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
285 expectMsgClass(duration("5 seconds"), WriteDataReply.class);
290 public void testOnReceiveHeliumR1WriteData() {
291 new JavaTestKit(getSystem()) {{
292 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
293 "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
295 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
296 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
297 ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
298 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
299 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
301 transaction.tell(serialized, getRef());
303 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
305 assertModification(transaction, WriteModification.class);
310 public void testOnReceiveMergeData() {
311 new JavaTestKit(getSystem()) {{
312 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
315 transaction.tell(new MergeData(TestModel.TEST_PATH,
316 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
317 toSerializable(), getRef());
319 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
321 assertModification(transaction, MergeModification.class);
324 transaction.tell(new MergeData(TestModel.TEST_PATH,
325 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
328 expectMsgClass(duration("5 seconds"), MergeDataReply.class);
333 public void testOnReceiveHeliumR1MergeData() throws Exception {
334 new JavaTestKit(getSystem()) {{
335 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
336 "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
338 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
339 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
340 ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
341 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
342 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
344 transaction.tell(serialized, getRef());
346 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
348 assertModification(transaction, MergeModification.class);
353 public void testOnReceiveDeleteData() throws Exception {
354 new JavaTestKit(getSystem()) {{
355 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
358 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
359 toSerializable(), getRef());
361 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
363 assertModification(transaction, DeleteModification.class);
366 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
368 expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
373 public void testOnReceiveBatchedModifications() throws Exception {
374 new JavaTestKit(getSystem()) {{
376 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
377 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
378 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
379 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
381 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
382 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
383 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
384 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
386 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
387 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
388 new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
390 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
392 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
393 batched.addModification(new WriteModification(writePath, writeData));
394 batched.addModification(new MergeModification(mergePath, mergeData));
395 batched.addModification(new DeleteModification(deletePath));
397 transaction.tell(batched, getRef());
399 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
400 assertEquals("getNumBatched", 3, reply.getNumBatched());
402 JavaTestKit verification = new JavaTestKit(getSystem());
403 transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
405 CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
406 GetCompositeModificationReply.class).getModification();
408 assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
410 WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
411 assertEquals("getPath", writePath, write.getPath());
412 assertEquals("getData", writeData, write.getData());
414 MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
415 assertEquals("getPath", mergePath, merge.getPath());
416 assertEquals("getData", mergeData, merge.getData());
418 DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
419 assertEquals("getPath", deletePath, delete.getPath());
421 InOrder inOrder = Mockito.inOrder(mockModification);
422 inOrder.verify(mockModification).write(writePath, writeData);
423 inOrder.verify(mockModification).merge(mergePath, mergeData);
424 inOrder.verify(mockModification).delete(deletePath);
429 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
430 new JavaTestKit(getSystem()) {{
432 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
433 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
435 JavaTestKit watcher = new JavaTestKit(getSystem());
436 watcher.watch(transaction);
438 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
439 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
440 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
441 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
443 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
444 batched.addModification(new WriteModification(writePath, writeData));
446 transaction.tell(batched, getRef());
447 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
448 assertEquals("getNumBatched", 1, reply.getNumBatched());
450 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
451 batched.setReady(true);
452 batched.setTotalMessagesSent(2);
454 transaction.tell(batched, getRef());
455 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
456 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
461 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
462 new JavaTestKit(getSystem()) {{
464 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
465 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
467 JavaTestKit watcher = new JavaTestKit(getSystem());
468 watcher.watch(transaction);
470 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
471 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
472 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
473 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
475 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
476 batched.addModification(new WriteModification(writePath, writeData));
477 batched.setReady(true);
478 batched.setDoCommitOnReady(true);
479 batched.setTotalMessagesSent(1);
481 transaction.tell(batched, getRef());
482 expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
483 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
487 @Test(expected=TestException.class)
488 public void testOnReceiveBatchedModificationsFailure() throws Throwable {
489 new JavaTestKit(getSystem()) {{
491 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
492 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
493 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
494 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
495 "testOnReceiveBatchedModificationsFailure");
497 JavaTestKit watcher = new JavaTestKit(getSystem());
498 watcher.watch(transaction);
500 YangInstanceIdentifier path = TestModel.TEST_PATH;
501 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
503 doThrow(new TestException()).when(mockModification).write(path, node);
505 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
506 batched.addModification(new WriteModification(path, node));
508 transaction.tell(batched, getRef());
509 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
511 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
512 batched.setReady(true);
513 batched.setTotalMessagesSent(2);
515 transaction.tell(batched, getRef());
516 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
517 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
519 if(failure != null) {
520 throw failure.cause();
525 @Test(expected=IllegalStateException.class)
526 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
527 new JavaTestKit(getSystem()) {{
529 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
530 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
532 JavaTestKit watcher = new JavaTestKit(getSystem());
533 watcher.watch(transaction);
535 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
536 batched.setReady(true);
537 batched.setTotalMessagesSent(2);
539 transaction.tell(batched, getRef());
541 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
542 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
544 if(failure != null) {
545 throw failure.cause();
551 public void testOnReceivePreLithiumReadyTransaction() throws Exception {
552 new JavaTestKit(getSystem()) {{
553 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
554 "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
556 JavaTestKit watcher = new JavaTestKit(getSystem());
557 watcher.watch(transaction);
559 transaction.tell(new ReadyTransaction().toSerializable(), getRef());
561 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
562 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
566 new JavaTestKit(getSystem()) {{
567 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
568 "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
570 JavaTestKit watcher = new JavaTestKit(getSystem());
571 watcher.watch(transaction);
573 transaction.tell(new ReadyTransaction(), getRef());
575 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
576 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
581 public void testOnReceiveCreateSnapshot() throws Exception {
582 new JavaTestKit(getSystem()) {{
583 ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
584 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
586 NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
587 YangInstanceIdentifier.builder().build());
589 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
590 "testOnReceiveCreateSnapshot");
594 transaction.tell(CreateSnapshot.INSTANCE, getRef());
596 CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
598 assertNotNull("getSnapshot is null", reply.getSnapshot());
600 NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
601 reply.getSnapshot());
603 assertEquals("Root node", expectedRoot, actualRoot);
605 expectTerminated(duration("3 seconds"), transaction);
610 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
611 new JavaTestKit(getSystem()) {{
612 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
613 "testReadWriteTxOnReceiveCloseTransaction");
617 transaction.tell(new CloseTransaction().toSerializable(), getRef());
619 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
620 expectTerminated(duration("3 seconds"), transaction);
625 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
626 new JavaTestKit(getSystem()) {{
627 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
628 "testWriteTxOnReceiveCloseTransaction");
632 transaction.tell(new CloseTransaction().toSerializable(), getRef());
634 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
635 expectTerminated(duration("3 seconds"), transaction);
640 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
641 new JavaTestKit(getSystem()) {{
642 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
643 "testReadOnlyTxOnReceiveCloseTransaction");
647 transaction.tell(new CloseTransaction().toSerializable(), getRef());
649 expectMsgClass(duration("3 seconds"), Terminated.class);
653 @Test(expected=UnknownMessageException.class)
654 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
655 final ActorRef shard = createShard();
656 final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
657 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
658 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
660 transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
661 toSerializable(), ActorRef.noSender());
665 public void testShardTransactionInactivity() {
667 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
668 500, TimeUnit.MILLISECONDS).build();
670 new JavaTestKit(getSystem()) {{
671 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
672 "testShardTransactionInactivity");
676 expectMsgClass(duration("3 seconds"), Terminated.class);
680 public static class TestException extends RuntimeException {
681 private static final long serialVersionUID = 1L;