1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import static org.mockito.Mockito.doThrow;
8 import akka.actor.ActorRef;
9 import akka.actor.Props;
10 import akka.actor.Status.Failure;
11 import akka.actor.Terminated;
12 import akka.testkit.JavaTestKit;
13 import akka.testkit.TestActorRef;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import java.util.Collections;
16 import java.util.concurrent.TimeUnit;
17 import org.junit.Before;
18 import org.junit.Test;
19 import org.mockito.InOrder;
20 import org.mockito.Mockito;
21 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
22 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
23 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
24 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
26 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
31 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
32 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
33 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
34 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
35 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
41 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
42 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
43 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
44 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
45 import org.opendaylight.controller.cluster.datastore.modification.Modification;
46 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
47 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
48 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
49 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
50 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
51 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
52 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
53 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
54 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
55 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
56 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
57 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
58 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
59 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
60 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
61 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
63 public class ShardTransactionTest extends AbstractActorTest {
65 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
67 private static final ShardIdentifier SHARD_IDENTIFIER =
68 ShardIdentifier.builder().memberName("member-1")
69 .shardName("inventory").type("config").build();
71 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
73 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
75 private final InMemoryDOMDataStore store =
76 new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
80 store.onGlobalContextUpdated(testSchemaContext);
83 private ActorRef createShard(){
84 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
85 Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
88 private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
89 return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION);
92 private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) {
93 return newTransactionActor(transaction, null, name, version);
96 private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) {
97 return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION);
100 private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
102 Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
103 testSchemaContext, datastoreContext, shardStats, "txn", version);
104 return getSystem().actorOf(props, name);
108 public void testOnReceiveReadData() throws Exception {
109 new JavaTestKit(getSystem()) {{
110 final ActorRef shard = createShard();
112 testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO"));
114 testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW"));
117 private void testOnReceiveReadData(final ActorRef transaction) {
119 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
122 Object replySerialized =
123 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
125 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
128 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
130 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
132 assertNotNull(reply.getNormalizedNode());
137 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
138 new JavaTestKit(getSystem()) {{
139 final ActorRef shard = createShard();
141 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
142 store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
144 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
145 store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
148 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
150 transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
152 Object replySerialized =
153 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
155 assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
158 transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
160 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
162 assertTrue(reply.getNormalizedNode() == null);
167 public void testOnReceiveReadDataHeliumR1() throws Exception {
168 new JavaTestKit(getSystem()) {{
169 ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
170 "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
172 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
175 ShardTransactionMessages.ReadDataReply replySerialized =
176 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
178 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
183 public void testOnReceiveDataExistsPositive() throws Exception {
184 new JavaTestKit(getSystem()) {{
185 final ActorRef shard = createShard();
187 testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard,
188 "testDataExistsPositiveRO"));
190 testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard,
191 "testDataExistsPositiveRW"));
194 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
195 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
198 ShardTransactionMessages.DataExistsReply replySerialized =
199 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
201 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
204 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
206 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
208 assertTrue(reply.exists());
213 public void testOnReceiveDataExistsNegative() throws Exception {
214 new JavaTestKit(getSystem()) {{
215 final ActorRef shard = createShard();
217 testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard,
218 "testDataExistsNegativeRO"));
220 testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard,
221 "testDataExistsNegativeRW"));
224 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
225 transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
227 ShardTransactionMessages.DataExistsReply replySerialized =
228 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
230 assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
233 transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
235 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
237 assertFalse(reply.exists());
241 private void assertModification(final ActorRef subject,
242 final Class<? extends Modification> modificationType) {
243 new JavaTestKit(getSystem()) {{
244 subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
246 CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
247 GetCompositeModificationReply.class).getModification();
249 assertTrue(compositeModification.getModifications().size() == 1);
250 assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
255 public void testOnReceiveWriteData() throws Exception {
256 new JavaTestKit(getSystem()) {{
257 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
258 "testOnReceiveWriteData");
260 transaction.tell(new WriteData(TestModel.TEST_PATH,
261 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
262 toSerializable(), getRef());
264 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
266 assertModification(transaction, WriteModification.class);
268 // unserialized write
269 transaction.tell(new WriteData(TestModel.TEST_PATH,
270 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
273 expectMsgClass(duration("5 seconds"), WriteDataReply.class);
278 public void testOnReceiveHeliumR1WriteData() throws Exception {
279 new JavaTestKit(getSystem()) {{
280 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
281 "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
283 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
284 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
285 ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
286 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
287 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
289 transaction.tell(serialized, getRef());
291 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
293 assertModification(transaction, WriteModification.class);
298 public void testOnReceiveMergeData() throws Exception {
299 new JavaTestKit(getSystem()) {{
300 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
303 transaction.tell(new MergeData(TestModel.TEST_PATH,
304 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
305 toSerializable(), getRef());
307 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
309 assertModification(transaction, MergeModification.class);
312 transaction.tell(new MergeData(TestModel.TEST_PATH,
313 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
316 expectMsgClass(duration("5 seconds"), MergeDataReply.class);
321 public void testOnReceiveHeliumR1MergeData() throws Exception {
322 new JavaTestKit(getSystem()) {{
323 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
324 "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
326 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
327 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
328 ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
329 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
330 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
332 transaction.tell(serialized, getRef());
334 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
336 assertModification(transaction, MergeModification.class);
341 public void testOnReceiveDeleteData() throws Exception {
342 new JavaTestKit(getSystem()) {{
343 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
346 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
347 toSerializable(), getRef());
349 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
351 assertModification(transaction, DeleteModification.class);
354 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
356 expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
361 public void testOnReceiveBatchedModifications() throws Exception {
362 new JavaTestKit(getSystem()) {{
364 DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
365 final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications");
367 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
368 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
369 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
370 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
372 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
373 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
374 new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
376 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
378 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
379 batched.addModification(new WriteModification(writePath, writeData));
380 batched.addModification(new MergeModification(mergePath, mergeData));
381 batched.addModification(new DeleteModification(deletePath));
383 transaction.tell(batched, getRef());
385 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
386 assertEquals("getNumBatched", 3, reply.getNumBatched());
388 JavaTestKit verification = new JavaTestKit(getSystem());
389 transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
391 CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
392 GetCompositeModificationReply.class).getModification();
394 assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
396 WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
397 assertEquals("getPath", writePath, write.getPath());
398 assertEquals("getData", writeData, write.getData());
400 MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
401 assertEquals("getPath", mergePath, merge.getPath());
402 assertEquals("getData", mergeData, merge.getData());
404 DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
405 assertEquals("getPath", deletePath, delete.getPath());
407 InOrder inOrder = Mockito.inOrder(mockWriteTx);
408 inOrder.verify(mockWriteTx).write(writePath, writeData);
409 inOrder.verify(mockWriteTx).merge(mergePath, mergeData);
410 inOrder.verify(mockWriteTx).delete(deletePath);
415 public void testOnReceiveBatchedModificationsReady() throws Exception {
416 new JavaTestKit(getSystem()) {{
418 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
419 "testOnReceiveBatchedModificationsReady");
421 JavaTestKit watcher = new JavaTestKit(getSystem());
422 watcher.watch(transaction);
424 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
425 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
426 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
427 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
429 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
430 batched.addModification(new WriteModification(writePath, writeData));
432 transaction.tell(batched, getRef());
433 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
434 assertEquals("getNumBatched", 1, reply.getNumBatched());
436 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
437 batched.setReady(true);
438 batched.setTotalMessagesSent(2);
440 transaction.tell(batched, getRef());
441 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
442 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
446 @Test(expected=TestException.class)
447 public void testOnReceiveBatchedModificationsFailure() throws Throwable {
448 new JavaTestKit(getSystem()) {{
450 DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
451 final ActorRef transaction = newTransactionActor(mockWriteTx,
452 "testOnReceiveBatchedModificationsFailure");
454 JavaTestKit watcher = new JavaTestKit(getSystem());
455 watcher.watch(transaction);
457 YangInstanceIdentifier path = TestModel.TEST_PATH;
458 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
460 doThrow(new TestException()).when(mockWriteTx).write(path, node);
462 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
463 batched.addModification(new WriteModification(path, node));
465 transaction.tell(batched, getRef());
466 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
468 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
469 batched.setReady(true);
470 batched.setTotalMessagesSent(2);
472 transaction.tell(batched, getRef());
473 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
474 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
476 if(failure != null) {
477 throw failure.cause();
482 @Test(expected=IllegalStateException.class)
483 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
484 new JavaTestKit(getSystem()) {{
486 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
487 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
489 JavaTestKit watcher = new JavaTestKit(getSystem());
490 watcher.watch(transaction);
492 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
493 batched.setReady(true);
494 batched.setTotalMessagesSent(2);
496 transaction.tell(batched, getRef());
498 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
499 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
501 if(failure != null) {
502 throw failure.cause();
508 public void testOnReceivePreLithiumReadyTransaction() throws Exception {
509 new JavaTestKit(getSystem()) {{
510 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
511 "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
513 JavaTestKit watcher = new JavaTestKit(getSystem());
514 watcher.watch(transaction);
516 transaction.tell(new ReadyTransaction().toSerializable(), getRef());
518 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
519 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
523 new JavaTestKit(getSystem()) {{
524 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
525 "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
527 JavaTestKit watcher = new JavaTestKit(getSystem());
528 watcher.watch(transaction);
530 transaction.tell(new ReadyTransaction(), getRef());
532 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
533 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
538 public void testOnReceiveCreateSnapshot() throws Exception {
539 new JavaTestKit(getSystem()) {{
540 ShardTest.writeToStore(store, TestModel.TEST_PATH,
541 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
543 NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store,
544 YangInstanceIdentifier.builder().build());
546 final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
547 "testOnReceiveCreateSnapshot");
551 transaction.tell(CreateSnapshot.INSTANCE, getRef());
553 CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
555 assertNotNull("getSnapshot is null", reply.getSnapshot());
557 NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
558 reply.getSnapshot());
560 assertEquals("Root node", expectedRoot, actualRoot);
562 expectTerminated(duration("3 seconds"), transaction);
567 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
568 new JavaTestKit(getSystem()) {{
569 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
570 "testReadWriteTxOnReceiveCloseTransaction");
574 transaction.tell(new CloseTransaction().toSerializable(), getRef());
576 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
577 expectTerminated(duration("3 seconds"), transaction);
582 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
583 new JavaTestKit(getSystem()) {{
584 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
585 "testWriteTxOnReceiveCloseTransaction");
589 transaction.tell(new CloseTransaction().toSerializable(), getRef());
591 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
592 expectTerminated(duration("3 seconds"), transaction);
597 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
598 new JavaTestKit(getSystem()) {{
599 final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
600 "testReadOnlyTxOnReceiveCloseTransaction");
604 transaction.tell(new CloseTransaction().toSerializable(), getRef());
606 expectMsgClass(duration("3 seconds"), Terminated.class);
610 @Test(expected=UnknownMessageException.class)
611 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
612 final ActorRef shard = createShard();
613 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
614 testSchemaContext, datastoreContext, shardStats, "txn",
615 DataStoreVersions.CURRENT_VERSION);
616 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
618 transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
619 toSerializable(), ActorRef.noSender());
623 public void testShardTransactionInactivity() {
625 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
626 500, TimeUnit.MILLISECONDS).build();
628 new JavaTestKit(getSystem()) {{
629 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
630 "testShardTransactionInactivity");
634 expectMsgClass(duration("3 seconds"), Terminated.class);
638 public static class TestException extends RuntimeException {
639 private static final long serialVersionUID = 1L;