2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.mockito.InOrder;
25 import org.mockito.Mockito;
26 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
27 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
28 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
29 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
35 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
37 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
38 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
40 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
44 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
45 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
46 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
47 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
48 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
49 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
50 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
51 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
52 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
53 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
54 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
55 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
56 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
57 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
58 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
59 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
60 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
61 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
62 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
64 public class ShardTransactionTest extends AbstractActorTest {
66 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
67 private static final TransactionType RO = TransactionType.READ_ONLY;
68 private static final TransactionType RW = TransactionType.READ_WRITE;
69 private static final TransactionType WO = TransactionType.WRITE_ONLY;
71 private static final ShardIdentifier SHARD_IDENTIFIER =
72 ShardIdentifier.builder().memberName("member-1")
73 .shardName("inventory").type("config").build();
75 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
77 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
79 private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
81 private int txCounter = 0;
83 private ActorRef createShard() {
84 ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
85 schemaContext(TestModel.createTestContext()).props());
86 ShardTestKit.waitUntilLeader(shard);
90 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
91 return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
94 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
95 return newTransactionActor(type, transaction, null, name, version);
98 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
99 return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
102 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
104 Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
105 datastoreContext, shardStats, "txn", version);
106 return getSystem().actorOf(props, name);
109 private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
110 return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
113 private ReadWriteShardDataTreeTransaction readWriteTransaction() {
114 return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
118 public void testOnReceiveReadData() throws Exception {
119 new JavaTestKit(getSystem()) {{
120 final ActorRef shard = createShard();
122 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
124 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
127 private void testOnReceiveReadData(final ActorRef transaction) {
129 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
132 Object replySerialized =
133 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
135 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
138 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
140 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
142 assertNotNull(reply.getNormalizedNode());
147 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
148 new JavaTestKit(getSystem()) {{
149 final ActorRef shard = createShard();
151 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
152 RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
154 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
155 RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
158 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
160 transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
162 Object replySerialized =
163 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
165 assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
168 transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
170 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
172 assertTrue(reply.getNormalizedNode() == null);
177 public void testOnReceiveReadDataHeliumR1() throws Exception {
178 new JavaTestKit(getSystem()) {{
179 ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(),
180 "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
182 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
185 ShardTransactionMessages.ReadDataReply replySerialized =
186 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
188 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
193 public void testOnReceiveDataExistsPositive() throws Exception {
194 new JavaTestKit(getSystem()) {{
195 final ActorRef shard = createShard();
197 testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
198 "testDataExistsPositiveRO"));
200 testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
201 "testDataExistsPositiveRW"));
204 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
205 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
208 ShardTransactionMessages.DataExistsReply replySerialized =
209 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
211 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
214 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
216 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
218 assertTrue(reply.exists());
223 public void testOnReceiveDataExistsNegative() throws Exception {
224 new JavaTestKit(getSystem()) {{
225 final ActorRef shard = createShard();
227 testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
228 "testDataExistsNegativeRO"));
230 testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
231 "testDataExistsNegativeRW"));
234 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
235 transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
237 ShardTransactionMessages.DataExistsReply replySerialized =
238 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
240 assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
243 transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
245 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
247 assertFalse(reply.exists());
252 public void testOnReceiveWriteData() {
253 new JavaTestKit(getSystem()) {{
254 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
255 "testOnReceiveWriteData");
257 transaction.tell(new WriteData(TestModel.TEST_PATH,
258 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
259 toSerializable(), getRef());
261 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
263 // unserialized write
264 transaction.tell(new WriteData(TestModel.TEST_PATH,
265 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
268 expectMsgClass(duration("5 seconds"), WriteDataReply.class);
273 public void testOnReceiveHeliumR1WriteData() {
274 new JavaTestKit(getSystem()) {{
275 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
276 "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
278 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
279 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
280 ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
281 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
282 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
284 transaction.tell(serialized, getRef());
286 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
291 public void testOnReceiveMergeData() {
292 new JavaTestKit(getSystem()) {{
293 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
296 transaction.tell(new MergeData(TestModel.TEST_PATH,
297 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
298 toSerializable(), getRef());
300 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
303 transaction.tell(new MergeData(TestModel.TEST_PATH,
304 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
307 expectMsgClass(duration("5 seconds"), MergeDataReply.class);
312 public void testOnReceiveHeliumR1MergeData() throws Exception {
313 new JavaTestKit(getSystem()) {{
314 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
315 "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
317 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
318 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
319 ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
320 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
321 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
323 transaction.tell(serialized, getRef());
325 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
330 public void testOnReceiveDeleteData() throws Exception {
331 new JavaTestKit(getSystem()) {{
332 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
335 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
336 toSerializable(), getRef());
338 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
341 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
343 expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
348 public void testOnReceiveBatchedModifications() throws Exception {
349 new JavaTestKit(getSystem()) {{
351 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
352 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
353 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
354 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
356 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
357 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
358 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
359 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
361 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
362 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
363 new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
365 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
367 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
368 batched.addModification(new WriteModification(writePath, writeData));
369 batched.addModification(new MergeModification(mergePath, mergeData));
370 batched.addModification(new DeleteModification(deletePath));
372 transaction.tell(batched, getRef());
374 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
375 assertEquals("getNumBatched", 3, reply.getNumBatched());
377 InOrder inOrder = Mockito.inOrder(mockModification);
378 inOrder.verify(mockModification).write(writePath, writeData);
379 inOrder.verify(mockModification).merge(mergePath, mergeData);
380 inOrder.verify(mockModification).delete(deletePath);
385 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
386 new JavaTestKit(getSystem()) {{
388 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
389 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
391 JavaTestKit watcher = new JavaTestKit(getSystem());
392 watcher.watch(transaction);
394 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
395 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
396 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
397 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
399 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
400 batched.addModification(new WriteModification(writePath, writeData));
402 transaction.tell(batched, getRef());
403 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
404 assertEquals("getNumBatched", 1, reply.getNumBatched());
406 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
407 batched.setReady(true);
408 batched.setTotalMessagesSent(2);
410 transaction.tell(batched, getRef());
411 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
412 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
417 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
418 new JavaTestKit(getSystem()) {{
420 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
421 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
423 JavaTestKit watcher = new JavaTestKit(getSystem());
424 watcher.watch(transaction);
426 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
427 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
428 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
429 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
431 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
432 batched.addModification(new WriteModification(writePath, writeData));
433 batched.setReady(true);
434 batched.setDoCommitOnReady(true);
435 batched.setTotalMessagesSent(1);
437 transaction.tell(batched, getRef());
438 expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
439 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
443 @Test(expected=TestException.class)
444 public void testOnReceiveBatchedModificationsFailure() throws Throwable {
445 new JavaTestKit(getSystem()) {{
447 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
448 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
449 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
450 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
451 "testOnReceiveBatchedModificationsFailure");
453 JavaTestKit watcher = new JavaTestKit(getSystem());
454 watcher.watch(transaction);
456 YangInstanceIdentifier path = TestModel.TEST_PATH;
457 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
459 doThrow(new TestException()).when(mockModification).write(path, node);
461 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
462 batched.addModification(new WriteModification(path, node));
464 transaction.tell(batched, getRef());
465 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
467 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
468 batched.setReady(true);
469 batched.setTotalMessagesSent(2);
471 transaction.tell(batched, getRef());
472 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
473 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
475 if(failure != null) {
476 throw failure.cause();
481 @Test(expected=IllegalStateException.class)
482 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
483 new JavaTestKit(getSystem()) {{
485 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
486 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
488 JavaTestKit watcher = new JavaTestKit(getSystem());
489 watcher.watch(transaction);
491 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
492 batched.setReady(true);
493 batched.setTotalMessagesSent(2);
495 transaction.tell(batched, getRef());
497 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
498 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
500 if(failure != null) {
501 throw failure.cause();
507 public void testOnReceiveCreateSnapshot() throws Exception {
508 new JavaTestKit(getSystem()) {{
509 ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
510 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
512 NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
513 YangInstanceIdentifier.builder().build());
515 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
516 "testOnReceiveCreateSnapshot");
520 transaction.tell(CreateSnapshot.INSTANCE, getRef());
522 CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
524 assertNotNull("getSnapshot is null", reply.getSnapshot());
526 NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
527 reply.getSnapshot());
529 assertEquals("Root node", expectedRoot, actualRoot);
531 expectTerminated(duration("3 seconds"), transaction);
536 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
537 new JavaTestKit(getSystem()) {{
538 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
539 "testReadWriteTxOnReceiveCloseTransaction");
543 transaction.tell(new CloseTransaction().toSerializable(), getRef());
545 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
546 expectTerminated(duration("3 seconds"), transaction);
551 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
552 new JavaTestKit(getSystem()) {{
553 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
554 "testWriteTxOnReceiveCloseTransaction");
558 transaction.tell(new CloseTransaction().toSerializable(), getRef());
560 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
561 expectTerminated(duration("3 seconds"), transaction);
566 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
567 new JavaTestKit(getSystem()) {{
568 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
569 "testReadOnlyTxOnReceiveCloseTransaction");
573 transaction.tell(new CloseTransaction().toSerializable(), getRef());
575 expectMsgClass(duration("3 seconds"), Terminated.class);
579 @Test(expected=UnknownMessageException.class)
580 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
581 final ActorRef shard = createShard();
582 final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
583 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
584 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
586 transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
587 toSerializable(), ActorRef.noSender());
591 public void testShardTransactionInactivity() {
593 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
594 500, TimeUnit.MILLISECONDS).build();
596 new JavaTestKit(getSystem()) {{
597 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
598 "testShardTransactionInactivity");
602 expectMsgClass(duration("3 seconds"), Terminated.class);
606 public static class TestException extends RuntimeException {
607 private static final long serialVersionUID = 1L;