1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import static org.mockito.Mockito.doThrow;
8 import akka.actor.ActorRef;
9 import akka.actor.Props;
10 import akka.actor.Status.Failure;
11 import akka.actor.Terminated;
12 import akka.testkit.JavaTestKit;
13 import akka.testkit.TestActorRef;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import java.util.Collections;
16 import java.util.concurrent.TimeUnit;
17 import org.junit.Before;
18 import org.junit.Test;
19 import org.mockito.InOrder;
20 import org.mockito.Mockito;
21 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
22 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
23 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
24 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
26 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
31 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
32 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
33 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
34 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
36 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
42 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
43 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
44 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
45 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
46 import org.opendaylight.controller.cluster.datastore.modification.Modification;
47 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
48 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
49 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
50 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
51 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
52 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
53 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
54 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
55 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
56 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
57 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
58 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
59 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
60 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
61 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
62 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
64 public class ShardTransactionTest extends AbstractActorTest {
66 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
68 private static final ShardIdentifier SHARD_IDENTIFIER =
69 ShardIdentifier.builder().memberName("member-1")
70 .shardName("inventory").type("config").build();
72 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
74 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
76 private final InMemoryDOMDataStore store =
77 new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
81 store.onGlobalContextUpdated(testSchemaContext);
84 private ActorRef createShard(){
85 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
86 Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
89 private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
90 return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION);
93 private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) {
94 return newTransactionActor(transaction, null, name, version);
97 private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) {
98 return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION);
101 private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
103 Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
104 datastoreContext, shardStats, "txn", version);
105 return getSystem().actorOf(props, name);
109 public void testOnReceiveReadData() throws Exception {
110 new JavaTestKit(getSystem()) {{
111 final ActorRef shard = createShard();
113 testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO"));
115 testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW"));
118 private void testOnReceiveReadData(final ActorRef transaction) {
120 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
123 Object replySerialized =
124 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
126 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
129 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
131 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
133 assertNotNull(reply.getNormalizedNode());
138 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
139 new JavaTestKit(getSystem()) {{
140 final ActorRef shard = createShard();
142 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
143 store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
145 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
146 store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
149 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
151 transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
153 Object replySerialized =
154 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
156 assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
159 transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
161 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
163 assertTrue(reply.getNormalizedNode() == null);
168 public void testOnReceiveReadDataHeliumR1() throws Exception {
169 new JavaTestKit(getSystem()) {{
170 ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
171 "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
173 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
176 ShardTransactionMessages.ReadDataReply replySerialized =
177 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
179 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
184 public void testOnReceiveDataExistsPositive() throws Exception {
185 new JavaTestKit(getSystem()) {{
186 final ActorRef shard = createShard();
188 testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard,
189 "testDataExistsPositiveRO"));
191 testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard,
192 "testDataExistsPositiveRW"));
195 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
196 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
199 ShardTransactionMessages.DataExistsReply replySerialized =
200 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
202 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
205 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
207 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
209 assertTrue(reply.exists());
214 public void testOnReceiveDataExistsNegative() throws Exception {
215 new JavaTestKit(getSystem()) {{
216 final ActorRef shard = createShard();
218 testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard,
219 "testDataExistsNegativeRO"));
221 testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard,
222 "testDataExistsNegativeRW"));
225 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
226 transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
228 ShardTransactionMessages.DataExistsReply replySerialized =
229 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
231 assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
234 transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
236 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
238 assertFalse(reply.exists());
242 private void assertModification(final ActorRef subject,
243 final Class<? extends Modification> modificationType) {
244 new JavaTestKit(getSystem()) {{
245 subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
247 CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
248 GetCompositeModificationReply.class).getModification();
250 assertTrue(compositeModification.getModifications().size() == 1);
251 assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
256 public void testOnReceiveWriteData() throws Exception {
257 new JavaTestKit(getSystem()) {{
258 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
259 "testOnReceiveWriteData");
261 transaction.tell(new WriteData(TestModel.TEST_PATH,
262 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
263 toSerializable(), getRef());
265 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
267 assertModification(transaction, WriteModification.class);
269 // unserialized write
270 transaction.tell(new WriteData(TestModel.TEST_PATH,
271 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
274 expectMsgClass(duration("5 seconds"), WriteDataReply.class);
279 public void testOnReceiveHeliumR1WriteData() throws Exception {
280 new JavaTestKit(getSystem()) {{
281 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
282 "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
284 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
285 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
286 ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
287 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
288 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
290 transaction.tell(serialized, getRef());
292 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
294 assertModification(transaction, WriteModification.class);
299 public void testOnReceiveMergeData() throws Exception {
300 new JavaTestKit(getSystem()) {{
301 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
304 transaction.tell(new MergeData(TestModel.TEST_PATH,
305 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
306 toSerializable(), getRef());
308 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
310 assertModification(transaction, MergeModification.class);
313 transaction.tell(new MergeData(TestModel.TEST_PATH,
314 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
317 expectMsgClass(duration("5 seconds"), MergeDataReply.class);
322 public void testOnReceiveHeliumR1MergeData() throws Exception {
323 new JavaTestKit(getSystem()) {{
324 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
325 "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
327 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
328 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
329 ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
330 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
331 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
333 transaction.tell(serialized, getRef());
335 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
337 assertModification(transaction, MergeModification.class);
342 public void testOnReceiveDeleteData() throws Exception {
343 new JavaTestKit(getSystem()) {{
344 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
347 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
348 toSerializable(), getRef());
350 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
352 assertModification(transaction, DeleteModification.class);
355 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
357 expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
362 public void testOnReceiveBatchedModifications() throws Exception {
363 new JavaTestKit(getSystem()) {{
365 DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
366 final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications");
368 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
369 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
370 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
371 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
373 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
374 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
375 new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
377 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
379 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
380 batched.addModification(new WriteModification(writePath, writeData));
381 batched.addModification(new MergeModification(mergePath, mergeData));
382 batched.addModification(new DeleteModification(deletePath));
384 transaction.tell(batched, getRef());
386 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
387 assertEquals("getNumBatched", 3, reply.getNumBatched());
389 JavaTestKit verification = new JavaTestKit(getSystem());
390 transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
392 CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
393 GetCompositeModificationReply.class).getModification();
395 assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
397 WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
398 assertEquals("getPath", writePath, write.getPath());
399 assertEquals("getData", writeData, write.getData());
401 MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
402 assertEquals("getPath", mergePath, merge.getPath());
403 assertEquals("getData", mergeData, merge.getData());
405 DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
406 assertEquals("getPath", deletePath, delete.getPath());
408 InOrder inOrder = Mockito.inOrder(mockWriteTx);
409 inOrder.verify(mockWriteTx).write(writePath, writeData);
410 inOrder.verify(mockWriteTx).merge(mergePath, mergeData);
411 inOrder.verify(mockWriteTx).delete(deletePath);
416 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
417 new JavaTestKit(getSystem()) {{
419 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
420 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
422 JavaTestKit watcher = new JavaTestKit(getSystem());
423 watcher.watch(transaction);
425 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
426 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
427 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
428 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
430 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
431 batched.addModification(new WriteModification(writePath, writeData));
433 transaction.tell(batched, getRef());
434 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
435 assertEquals("getNumBatched", 1, reply.getNumBatched());
437 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
438 batched.setReady(true);
439 batched.setTotalMessagesSent(2);
441 transaction.tell(batched, getRef());
442 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
443 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
448 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
449 new JavaTestKit(getSystem()) {{
451 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
452 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
454 JavaTestKit watcher = new JavaTestKit(getSystem());
455 watcher.watch(transaction);
457 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
458 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
459 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
460 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
462 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
463 batched.addModification(new WriteModification(writePath, writeData));
464 batched.setReady(true);
465 batched.setDoCommitOnReady(true);
466 batched.setTotalMessagesSent(1);
468 transaction.tell(batched, getRef());
469 expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
470 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
474 @Test(expected=TestException.class)
475 public void testOnReceiveBatchedModificationsFailure() throws Throwable {
476 new JavaTestKit(getSystem()) {{
478 DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
479 final ActorRef transaction = newTransactionActor(mockWriteTx,
480 "testOnReceiveBatchedModificationsFailure");
482 JavaTestKit watcher = new JavaTestKit(getSystem());
483 watcher.watch(transaction);
485 YangInstanceIdentifier path = TestModel.TEST_PATH;
486 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
488 doThrow(new TestException()).when(mockWriteTx).write(path, node);
490 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
491 batched.addModification(new WriteModification(path, node));
493 transaction.tell(batched, getRef());
494 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
496 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
497 batched.setReady(true);
498 batched.setTotalMessagesSent(2);
500 transaction.tell(batched, getRef());
501 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
502 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
504 if(failure != null) {
505 throw failure.cause();
510 @Test(expected=IllegalStateException.class)
511 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
512 new JavaTestKit(getSystem()) {{
514 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
515 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
517 JavaTestKit watcher = new JavaTestKit(getSystem());
518 watcher.watch(transaction);
520 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
521 batched.setReady(true);
522 batched.setTotalMessagesSent(2);
524 transaction.tell(batched, getRef());
526 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
527 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
529 if(failure != null) {
530 throw failure.cause();
536 public void testOnReceivePreLithiumReadyTransaction() throws Exception {
537 new JavaTestKit(getSystem()) {{
538 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
539 "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
541 JavaTestKit watcher = new JavaTestKit(getSystem());
542 watcher.watch(transaction);
544 transaction.tell(new ReadyTransaction().toSerializable(), getRef());
546 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
547 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
551 new JavaTestKit(getSystem()) {{
552 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
553 "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
555 JavaTestKit watcher = new JavaTestKit(getSystem());
556 watcher.watch(transaction);
558 transaction.tell(new ReadyTransaction(), getRef());
560 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
561 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
566 public void testOnReceiveCreateSnapshot() throws Exception {
567 new JavaTestKit(getSystem()) {{
568 ShardTest.writeToStore(store, TestModel.TEST_PATH,
569 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
571 NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store,
572 YangInstanceIdentifier.builder().build());
574 final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
575 "testOnReceiveCreateSnapshot");
579 transaction.tell(CreateSnapshot.INSTANCE, getRef());
581 CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
583 assertNotNull("getSnapshot is null", reply.getSnapshot());
585 NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
586 reply.getSnapshot());
588 assertEquals("Root node", expectedRoot, actualRoot);
590 expectTerminated(duration("3 seconds"), transaction);
595 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
596 new JavaTestKit(getSystem()) {{
597 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
598 "testReadWriteTxOnReceiveCloseTransaction");
602 transaction.tell(new CloseTransaction().toSerializable(), getRef());
604 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
605 expectTerminated(duration("3 seconds"), transaction);
610 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
611 new JavaTestKit(getSystem()) {{
612 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
613 "testWriteTxOnReceiveCloseTransaction");
617 transaction.tell(new CloseTransaction().toSerializable(), getRef());
619 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
620 expectTerminated(duration("3 seconds"), transaction);
625 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
626 new JavaTestKit(getSystem()) {{
627 final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
628 "testReadOnlyTxOnReceiveCloseTransaction");
632 transaction.tell(new CloseTransaction().toSerializable(), getRef());
634 expectMsgClass(duration("3 seconds"), Terminated.class);
638 @Test(expected=UnknownMessageException.class)
639 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
640 final ActorRef shard = createShard();
641 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
642 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
643 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
645 transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
646 toSerializable(), ActorRef.noSender());
650 public void testShardTransactionInactivity() {
652 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
653 500, TimeUnit.MILLISECONDS).build();
655 new JavaTestKit(getSystem()) {{
656 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
657 "testShardTransactionInactivity");
661 expectMsgClass(duration("3 seconds"), Terminated.class);
665 public static class TestException extends RuntimeException {
666 private static final long serialVersionUID = 1L;