+ //unserialized merge
+ transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
+
+ expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
+ }};
+ }
+
+
+ @Test
+ public void testOnReceiveReadyTransaction() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shard = createShard();
+ final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
+
+ watch(transaction);
+
+ transaction.tell(new ReadyTransaction().toSerializable(), getRef());
+
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
+ Terminated.class);
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
+ Terminated.class);
+ }};
+
+ // test
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shard = createShard();
+ final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
+
+ watch(transaction);
+
+ transaction.tell(new ReadyTransaction(), getRef());
+
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
+ Terminated.class);
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
+ Terminated.class);
+ }};
+
+ }
+
+ @Test
+ public void testOnReceiveCloseTransaction() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shard = createShard();
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
+
+ watch(transaction);
+
+ transaction.tell(new CloseTransaction().toSerializable(), getRef());
+
+ expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
+ expectTerminated(duration("3 seconds"), transaction);
+ }};
+ }
+
+ @Test(expected=UnknownMessageException.class)
+ public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
+ final ActorRef shard = createShard();
+ final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
+ final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
+
+ transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
+ }
+
+ @Test
+ public void testShardTransactionInactivity() {
+
+ datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
+ Duration.create(500, TimeUnit.MILLISECONDS)).build();
+
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shard = createShard();
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
+ final ActorRef transaction =
+ getSystem().actorOf(props, "testShardTransactionInactivity");