2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.mockito.InOrder;
25 import org.mockito.Mockito;
26 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
27 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
28 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
29 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
35 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
37 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
38 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
40 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
46 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
47 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
48 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
49 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
50 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
51 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
52 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
53 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
54 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
55 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
56 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
57 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
58 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
59 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
60 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
61 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
62 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
63 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
65 public class ShardTransactionTest extends AbstractActorTest {
67 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
68 private static final TransactionType RO = TransactionType.READ_ONLY;
69 private static final TransactionType RW = TransactionType.READ_WRITE;
70 private static final TransactionType WO = TransactionType.WRITE_ONLY;
72 private static final ShardIdentifier SHARD_IDENTIFIER =
73 ShardIdentifier.builder().memberName("member-1")
74 .shardName("inventory").type("config").build();
76 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
78 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
80 private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
82 private int txCounter = 0;
84 private ActorRef createShard() {
85 return getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
86 schemaContext(TestModel.createTestContext()).props());
89 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
90 return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
93 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
94 return newTransactionActor(type, transaction, null, name, version);
97 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
98 return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
101 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
103 Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
104 datastoreContext, shardStats, "txn", version);
105 return getSystem().actorOf(props, name);
108 private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
109 return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
112 private ReadWriteShardDataTreeTransaction readWriteTransaction() {
113 return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
117 public void testOnReceiveReadData() throws Exception {
118 new JavaTestKit(getSystem()) {{
119 final ActorRef shard = createShard();
121 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
123 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
126 private void testOnReceiveReadData(final ActorRef transaction) {
128 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
131 Object replySerialized =
132 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
134 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
137 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
139 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
141 assertNotNull(reply.getNormalizedNode());
146 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
147 new JavaTestKit(getSystem()) {{
148 final ActorRef shard = createShard();
150 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
151 RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
153 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
154 RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
157 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
159 transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
161 Object replySerialized =
162 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
164 assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
167 transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
169 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
171 assertTrue(reply.getNormalizedNode() == null);
176 public void testOnReceiveReadDataHeliumR1() throws Exception {
177 new JavaTestKit(getSystem()) {{
178 ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(),
179 "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
181 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
184 ShardTransactionMessages.ReadDataReply replySerialized =
185 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
187 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
192 public void testOnReceiveDataExistsPositive() throws Exception {
193 new JavaTestKit(getSystem()) {{
194 final ActorRef shard = createShard();
196 testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
197 "testDataExistsPositiveRO"));
199 testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
200 "testDataExistsPositiveRW"));
203 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
204 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
207 ShardTransactionMessages.DataExistsReply replySerialized =
208 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
210 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
213 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
215 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
217 assertTrue(reply.exists());
222 public void testOnReceiveDataExistsNegative() throws Exception {
223 new JavaTestKit(getSystem()) {{
224 final ActorRef shard = createShard();
226 testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
227 "testDataExistsNegativeRO"));
229 testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
230 "testDataExistsNegativeRW"));
233 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
234 transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
236 ShardTransactionMessages.DataExistsReply replySerialized =
237 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
239 assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
242 transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
244 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
246 assertFalse(reply.exists());
251 public void testOnReceiveWriteData() {
252 new JavaTestKit(getSystem()) {{
253 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
254 "testOnReceiveWriteData");
256 transaction.tell(new WriteData(TestModel.TEST_PATH,
257 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
258 toSerializable(), getRef());
260 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
262 // unserialized write
263 transaction.tell(new WriteData(TestModel.TEST_PATH,
264 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
267 expectMsgClass(duration("5 seconds"), WriteDataReply.class);
272 public void testOnReceiveHeliumR1WriteData() {
273 new JavaTestKit(getSystem()) {{
274 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
275 "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
277 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
278 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
279 ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
280 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
281 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
283 transaction.tell(serialized, getRef());
285 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
290 public void testOnReceiveMergeData() {
291 new JavaTestKit(getSystem()) {{
292 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
295 transaction.tell(new MergeData(TestModel.TEST_PATH,
296 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
297 toSerializable(), getRef());
299 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
302 transaction.tell(new MergeData(TestModel.TEST_PATH,
303 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
306 expectMsgClass(duration("5 seconds"), MergeDataReply.class);
311 public void testOnReceiveHeliumR1MergeData() throws Exception {
312 new JavaTestKit(getSystem()) {{
313 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
314 "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
316 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
317 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
318 ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
319 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
320 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
322 transaction.tell(serialized, getRef());
324 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
329 public void testOnReceiveDeleteData() throws Exception {
330 new JavaTestKit(getSystem()) {{
331 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
334 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
335 toSerializable(), getRef());
337 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
340 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
342 expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
347 public void testOnReceiveBatchedModifications() throws Exception {
348 new JavaTestKit(getSystem()) {{
350 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
351 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
352 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
353 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
355 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
356 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
357 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
358 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
360 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
361 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
362 new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
364 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
366 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
367 batched.addModification(new WriteModification(writePath, writeData));
368 batched.addModification(new MergeModification(mergePath, mergeData));
369 batched.addModification(new DeleteModification(deletePath));
371 transaction.tell(batched, getRef());
373 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
374 assertEquals("getNumBatched", 3, reply.getNumBatched());
376 InOrder inOrder = Mockito.inOrder(mockModification);
377 inOrder.verify(mockModification).write(writePath, writeData);
378 inOrder.verify(mockModification).merge(mergePath, mergeData);
379 inOrder.verify(mockModification).delete(deletePath);
384 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
385 new JavaTestKit(getSystem()) {{
387 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
388 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
390 JavaTestKit watcher = new JavaTestKit(getSystem());
391 watcher.watch(transaction);
393 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
394 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
395 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
396 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
398 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
399 batched.addModification(new WriteModification(writePath, writeData));
401 transaction.tell(batched, getRef());
402 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
403 assertEquals("getNumBatched", 1, reply.getNumBatched());
405 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
406 batched.setReady(true);
407 batched.setTotalMessagesSent(2);
409 transaction.tell(batched, getRef());
410 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
411 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
416 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
417 new JavaTestKit(getSystem()) {{
419 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
420 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
422 JavaTestKit watcher = new JavaTestKit(getSystem());
423 watcher.watch(transaction);
425 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
426 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
427 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
428 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
430 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
431 batched.addModification(new WriteModification(writePath, writeData));
432 batched.setReady(true);
433 batched.setDoCommitOnReady(true);
434 batched.setTotalMessagesSent(1);
436 transaction.tell(batched, getRef());
437 expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
438 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
442 @Test(expected=TestException.class)
443 public void testOnReceiveBatchedModificationsFailure() throws Throwable {
444 new JavaTestKit(getSystem()) {{
446 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
447 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
448 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
449 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
450 "testOnReceiveBatchedModificationsFailure");
452 JavaTestKit watcher = new JavaTestKit(getSystem());
453 watcher.watch(transaction);
455 YangInstanceIdentifier path = TestModel.TEST_PATH;
456 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
458 doThrow(new TestException()).when(mockModification).write(path, node);
460 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
461 batched.addModification(new WriteModification(path, node));
463 transaction.tell(batched, getRef());
464 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
466 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
467 batched.setReady(true);
468 batched.setTotalMessagesSent(2);
470 transaction.tell(batched, getRef());
471 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
472 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
474 if(failure != null) {
475 throw failure.cause();
480 @Test(expected=IllegalStateException.class)
481 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
482 new JavaTestKit(getSystem()) {{
484 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
485 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
487 JavaTestKit watcher = new JavaTestKit(getSystem());
488 watcher.watch(transaction);
490 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
491 batched.setReady(true);
492 batched.setTotalMessagesSent(2);
494 transaction.tell(batched, getRef());
496 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
497 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
499 if(failure != null) {
500 throw failure.cause();
506 public void testOnReceivePreLithiumReadyTransaction() throws Exception {
507 new JavaTestKit(getSystem()) {{
508 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
509 "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
511 JavaTestKit watcher = new JavaTestKit(getSystem());
512 watcher.watch(transaction);
514 transaction.tell(new ReadyTransaction().toSerializable(), getRef());
516 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
517 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
521 new JavaTestKit(getSystem()) {{
522 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
523 "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
525 JavaTestKit watcher = new JavaTestKit(getSystem());
526 watcher.watch(transaction);
528 transaction.tell(new ReadyTransaction(), getRef());
530 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
531 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
536 public void testOnReceiveCreateSnapshot() throws Exception {
537 new JavaTestKit(getSystem()) {{
538 ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
539 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
541 NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
542 YangInstanceIdentifier.builder().build());
544 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
545 "testOnReceiveCreateSnapshot");
549 transaction.tell(CreateSnapshot.INSTANCE, getRef());
551 CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
553 assertNotNull("getSnapshot is null", reply.getSnapshot());
555 NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
556 reply.getSnapshot());
558 assertEquals("Root node", expectedRoot, actualRoot);
560 expectTerminated(duration("3 seconds"), transaction);
565 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
566 new JavaTestKit(getSystem()) {{
567 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
568 "testReadWriteTxOnReceiveCloseTransaction");
572 transaction.tell(new CloseTransaction().toSerializable(), getRef());
574 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
575 expectTerminated(duration("3 seconds"), transaction);
580 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
581 new JavaTestKit(getSystem()) {{
582 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
583 "testWriteTxOnReceiveCloseTransaction");
587 transaction.tell(new CloseTransaction().toSerializable(), getRef());
589 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
590 expectTerminated(duration("3 seconds"), transaction);
595 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
596 new JavaTestKit(getSystem()) {{
597 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
598 "testReadOnlyTxOnReceiveCloseTransaction");
602 transaction.tell(new CloseTransaction().toSerializable(), getRef());
604 expectMsgClass(duration("3 seconds"), Terminated.class);
608 @Test(expected=UnknownMessageException.class)
609 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
610 final ActorRef shard = createShard();
611 final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
612 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
613 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
615 transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
616 toSerializable(), ActorRef.noSender());
620 public void testShardTransactionInactivity() {
622 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
623 500, TimeUnit.MILLISECONDS).build();
625 new JavaTestKit(getSystem()) {{
626 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
627 "testShardTransactionInactivity");
631 expectMsgClass(duration("3 seconds"), Terminated.class);
635 public static class TestException extends RuntimeException {
636 private static final long serialVersionUID = 1L;