+ canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture, duration);
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ }};
+ }
+
+ @Test
+ public void testAbortAfterReady() throws Throwable {
+ dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ // Ready a tx.
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the AbortTransaction message.
+
+ shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, AbortTransactionReply.class);
+
+ assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
+
+ // Now send CanCommitTransaction - should fail.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
+ assertTrue("Failure type", failure instanceof IllegalStateException);
+
+ // Ready and CanCommit another and verify success.
+
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+ }};
+ }
+
+ @Test
+ public void testAbortQueuedTransaction() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ // Ready 3 tx's.
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ final TransactionIdentifier transactionID3 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Abort the second tx while it's queued.
+
+ shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, AbortTransactionReply.class);
+
+ // Commit the other 2.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);