+ assertModification(transaction, MergeModification.class);
+
+ //unserialized merge
+ transaction.tell(new MergeData(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+ getRef());
+
+ expectMsgClass(duration("5 seconds"), MergeDataReply.class);
+ }};
+ }
+
+ @Test
+ public void testOnReceiveHeliumR1MergeData() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shard = createShard();
+ final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ DataStoreVersions.HELIUM_1_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1MergeData");
+
+ Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
+ .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+ .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+
+ transaction.tell(serialized, getRef());
+
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
+
+ assertModification(transaction, MergeModification.class);
+ }};
+ }
+
+ @Test
+ public void testOnReceiveDeleteData() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shard = createShard();
+ final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ DataStoreVersions.CURRENT_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
+
+ transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
+
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
+
+ assertModification(transaction, DeleteModification.class);
+
+ //unserialized merge
+ transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
+
+ expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
+ }};
+ }
+
+
+ @Test
+ public void testOnReceiveReadyTransaction() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shard = createShard();
+ final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ DataStoreVersions.CURRENT_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
+
+ watch(transaction);
+
+ transaction.tell(new ReadyTransaction().toSerializable(), getRef());
+
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
+ Terminated.class);
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
+ Terminated.class);
+ }};
+
+ // test
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shard = createShard();
+ final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ DataStoreVersions.CURRENT_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
+
+ watch(transaction);
+
+ transaction.tell(new ReadyTransaction(), getRef());
+
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
+ Terminated.class);
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
+ Terminated.class);
+ }};
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testOnReceiveCloseTransaction() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shard = createShard();
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ DataStoreVersions.CURRENT_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
+
+ watch(transaction);
+
+ transaction.tell(new CloseTransaction().toSerializable(), getRef());
+
+ expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
+ expectTerminated(duration("3 seconds"), transaction);
+ }};
+ }
+
+ @Test(expected=UnknownMessageException.class)
+ public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
+ final ActorRef shard = createShard();
+ final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ DataStoreVersions.CURRENT_VERSION);
+ final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
+
+ transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
+ }