2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.mockito.InOrder;
25 import org.mockito.Mockito;
26 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
27 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
28 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
29 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
35 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
37 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
38 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
40 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
46 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
47 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
48 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
49 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
50 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
51 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
52 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
53 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
54 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
55 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
56 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
57 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
58 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
59 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
60 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
61 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
62 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
63 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
65 public class ShardTransactionTest extends AbstractActorTest {
67 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
68 private static final TransactionType RO = TransactionType.READ_ONLY;
69 private static final TransactionType RW = TransactionType.READ_WRITE;
70 private static final TransactionType WO = TransactionType.WRITE_ONLY;
72 private static final ShardIdentifier SHARD_IDENTIFIER =
73 ShardIdentifier.builder().memberName("member-1")
74 .shardName("inventory").type("config").build();
76 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
78 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
80 private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
82 private int txCounter = 0;
84 private ActorRef createShard() {
85 ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
86 schemaContext(TestModel.createTestContext()).props());
87 ShardTestKit.waitUntilLeader(shard);
91 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
92 return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
95 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
96 return newTransactionActor(type, transaction, null, name, version);
99 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
100 return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
103 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
105 Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
106 datastoreContext, shardStats, "txn", version);
107 return getSystem().actorOf(props, name);
110 private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
111 return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
114 private ReadWriteShardDataTreeTransaction readWriteTransaction() {
115 return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
119 public void testOnReceiveReadData() throws Exception {
120 new JavaTestKit(getSystem()) {{
121 final ActorRef shard = createShard();
123 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
125 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
128 private void testOnReceiveReadData(final ActorRef transaction) {
130 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
133 Object replySerialized =
134 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
136 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
139 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
141 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
143 assertNotNull(reply.getNormalizedNode());
148 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
149 new JavaTestKit(getSystem()) {{
150 final ActorRef shard = createShard();
152 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
153 RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
155 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
156 RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
159 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
161 transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
163 Object replySerialized =
164 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
166 assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
169 transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
171 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
173 assertTrue(reply.getNormalizedNode() == null);
178 public void testOnReceiveReadDataHeliumR1() throws Exception {
179 new JavaTestKit(getSystem()) {{
180 ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(),
181 "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
183 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
186 ShardTransactionMessages.ReadDataReply replySerialized =
187 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
189 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
194 public void testOnReceiveDataExistsPositive() throws Exception {
195 new JavaTestKit(getSystem()) {{
196 final ActorRef shard = createShard();
198 testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
199 "testDataExistsPositiveRO"));
201 testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
202 "testDataExistsPositiveRW"));
205 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
206 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
209 ShardTransactionMessages.DataExistsReply replySerialized =
210 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
212 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
215 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
217 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
219 assertTrue(reply.exists());
224 public void testOnReceiveDataExistsNegative() throws Exception {
225 new JavaTestKit(getSystem()) {{
226 final ActorRef shard = createShard();
228 testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
229 "testDataExistsNegativeRO"));
231 testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
232 "testDataExistsNegativeRW"));
235 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
236 transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
238 ShardTransactionMessages.DataExistsReply replySerialized =
239 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
241 assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
244 transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
246 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
248 assertFalse(reply.exists());
253 public void testOnReceiveWriteData() {
254 new JavaTestKit(getSystem()) {{
255 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
256 "testOnReceiveWriteData");
258 transaction.tell(new WriteData(TestModel.TEST_PATH,
259 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
260 toSerializable(), getRef());
262 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
264 // unserialized write
265 transaction.tell(new WriteData(TestModel.TEST_PATH,
266 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
269 expectMsgClass(duration("5 seconds"), WriteDataReply.class);
274 public void testOnReceiveHeliumR1WriteData() {
275 new JavaTestKit(getSystem()) {{
276 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
277 "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
279 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
280 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
281 ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
282 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
283 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
285 transaction.tell(serialized, getRef());
287 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
292 public void testOnReceiveMergeData() {
293 new JavaTestKit(getSystem()) {{
294 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
297 transaction.tell(new MergeData(TestModel.TEST_PATH,
298 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
299 toSerializable(), getRef());
301 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
304 transaction.tell(new MergeData(TestModel.TEST_PATH,
305 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
308 expectMsgClass(duration("5 seconds"), MergeDataReply.class);
313 public void testOnReceiveHeliumR1MergeData() throws Exception {
314 new JavaTestKit(getSystem()) {{
315 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
316 "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
318 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
319 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
320 ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
321 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
322 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
324 transaction.tell(serialized, getRef());
326 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
331 public void testOnReceiveDeleteData() throws Exception {
332 new JavaTestKit(getSystem()) {{
333 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
336 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
337 toSerializable(), getRef());
339 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
342 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
344 expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
349 public void testOnReceiveBatchedModifications() throws Exception {
350 new JavaTestKit(getSystem()) {{
352 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
353 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
354 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
355 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
357 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
358 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
359 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
360 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
362 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
363 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
364 new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
366 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
368 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
369 batched.addModification(new WriteModification(writePath, writeData));
370 batched.addModification(new MergeModification(mergePath, mergeData));
371 batched.addModification(new DeleteModification(deletePath));
373 transaction.tell(batched, getRef());
375 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
376 assertEquals("getNumBatched", 3, reply.getNumBatched());
378 InOrder inOrder = Mockito.inOrder(mockModification);
379 inOrder.verify(mockModification).write(writePath, writeData);
380 inOrder.verify(mockModification).merge(mergePath, mergeData);
381 inOrder.verify(mockModification).delete(deletePath);
386 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
387 new JavaTestKit(getSystem()) {{
389 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
390 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
392 JavaTestKit watcher = new JavaTestKit(getSystem());
393 watcher.watch(transaction);
395 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
396 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
397 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
398 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
400 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
401 batched.addModification(new WriteModification(writePath, writeData));
403 transaction.tell(batched, getRef());
404 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
405 assertEquals("getNumBatched", 1, reply.getNumBatched());
407 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
408 batched.setReady(true);
409 batched.setTotalMessagesSent(2);
411 transaction.tell(batched, getRef());
412 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
413 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
418 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
419 new JavaTestKit(getSystem()) {{
421 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
422 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
424 JavaTestKit watcher = new JavaTestKit(getSystem());
425 watcher.watch(transaction);
427 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
428 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
429 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
430 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
432 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
433 batched.addModification(new WriteModification(writePath, writeData));
434 batched.setReady(true);
435 batched.setDoCommitOnReady(true);
436 batched.setTotalMessagesSent(1);
438 transaction.tell(batched, getRef());
439 expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
440 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
444 @Test(expected=TestException.class)
445 public void testOnReceiveBatchedModificationsFailure() throws Throwable {
446 new JavaTestKit(getSystem()) {{
448 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
449 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
450 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
451 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
452 "testOnReceiveBatchedModificationsFailure");
454 JavaTestKit watcher = new JavaTestKit(getSystem());
455 watcher.watch(transaction);
457 YangInstanceIdentifier path = TestModel.TEST_PATH;
458 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
460 doThrow(new TestException()).when(mockModification).write(path, node);
462 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
463 batched.addModification(new WriteModification(path, node));
465 transaction.tell(batched, getRef());
466 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
468 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
469 batched.setReady(true);
470 batched.setTotalMessagesSent(2);
472 transaction.tell(batched, getRef());
473 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
474 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
476 if(failure != null) {
477 throw failure.cause();
482 @Test(expected=IllegalStateException.class)
483 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
484 new JavaTestKit(getSystem()) {{
486 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
487 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
489 JavaTestKit watcher = new JavaTestKit(getSystem());
490 watcher.watch(transaction);
492 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
493 batched.setReady(true);
494 batched.setTotalMessagesSent(2);
496 transaction.tell(batched, getRef());
498 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
499 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
501 if(failure != null) {
502 throw failure.cause();
508 public void testOnReceivePreLithiumReadyTransaction() throws Exception {
509 new JavaTestKit(getSystem()) {{
510 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
511 "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
513 JavaTestKit watcher = new JavaTestKit(getSystem());
514 watcher.watch(transaction);
516 transaction.tell(new ReadyTransaction().toSerializable(), getRef());
518 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
519 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
523 new JavaTestKit(getSystem()) {{
524 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
525 "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
527 JavaTestKit watcher = new JavaTestKit(getSystem());
528 watcher.watch(transaction);
530 transaction.tell(new ReadyTransaction(), getRef());
532 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
533 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
538 public void testOnReceiveCreateSnapshot() throws Exception {
539 new JavaTestKit(getSystem()) {{
540 ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
541 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
543 NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
544 YangInstanceIdentifier.builder().build());
546 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
547 "testOnReceiveCreateSnapshot");
551 transaction.tell(CreateSnapshot.INSTANCE, getRef());
553 CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
555 assertNotNull("getSnapshot is null", reply.getSnapshot());
557 NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
558 reply.getSnapshot());
560 assertEquals("Root node", expectedRoot, actualRoot);
562 expectTerminated(duration("3 seconds"), transaction);
567 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
568 new JavaTestKit(getSystem()) {{
569 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
570 "testReadWriteTxOnReceiveCloseTransaction");
574 transaction.tell(new CloseTransaction().toSerializable(), getRef());
576 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
577 expectTerminated(duration("3 seconds"), transaction);
582 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
583 new JavaTestKit(getSystem()) {{
584 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
585 "testWriteTxOnReceiveCloseTransaction");
589 transaction.tell(new CloseTransaction().toSerializable(), getRef());
591 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
592 expectTerminated(duration("3 seconds"), transaction);
597 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
598 new JavaTestKit(getSystem()) {{
599 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
600 "testReadOnlyTxOnReceiveCloseTransaction");
604 transaction.tell(new CloseTransaction().toSerializable(), getRef());
606 expectMsgClass(duration("3 seconds"), Terminated.class);
610 @Test(expected=UnknownMessageException.class)
611 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
612 final ActorRef shard = createShard();
613 final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
614 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
615 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
617 transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
618 toSerializable(), ActorRef.noSender());
622 public void testShardTransactionInactivity() {
624 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
625 500, TimeUnit.MILLISECONDS).build();
627 new JavaTestKit(getSystem()) {{
628 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
629 "testShardTransactionInactivity");
633 expectMsgClass(duration("3 seconds"), Terminated.class);
637 public static class TestException extends RuntimeException {
638 private static final long serialVersionUID = 1L;