+ public void testTransactionCommitWithPriorExpiredCohortEntries() throws Exception {
+ dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testTransactionCommitWithPriorExpiredCohortEntries");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ final TransactionIdentifier transactionID3 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // All Tx's are readied. We'll send canCommit for the last one
+ // but not the others. The others
+ // should expire from the queue and the last one should be
+ // processed.
+
+ shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+ }
+ };
+ }
+
+ @Test
+ public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Exception {
+ dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testTransactionCommitWithSubsequentExpiredCohortEntry");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // CanCommit the first Tx so it's the current in-progress Tx.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+
+ // Ready the second Tx.
+
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Ready the third Tx.
+
+ final TransactionIdentifier transactionID3 = nextTransactionId();
+ final DataTreeModification modification3 = dataStore.newModification();
+ new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
+ .apply(modification3);
+ modification3.ready();
+ final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3,
+ true);
+ shard.tell(readyMessage, getRef());
+
+ // Commit the first Tx. After completing, the second should
+ // expire from the queue and the third
+ // Tx committed.
+
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ // Expect commit reply from the third Tx.
+
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
+ assertNotNull(TestModel.TEST2_PATH + " not found", node);
+ }
+ };
+ }
+
+ @Test
+ public void testCanCommitBeforeReadyFailure() throws Exception {
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCanCommitBeforeReadyFailure");
+
+ shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+ }
+ };
+ }
+
+ @Test
+ public void testAbortAfterCanCommit() throws Exception {
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterCanCommit");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+ final Timeout timeout = new Timeout(duration);
+
+ // Ready 2 transactions - the first one will be aborted.
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message for the first Tx.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
+ .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ // Send the CanCommitTransaction message for the 2nd Tx. This
+ // should get queued and
+ // processed after the first Tx completes.
+
+ final Future<Object> canCommitFuture = Patterns.ask(shard,
+ new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
+
+ // Send the AbortTransaction message for the first Tx. This
+ // should trigger the 2nd
+ // Tx to proceed.
+
+ shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, AbortTransactionReply.class);
+
+ // Wait for the 2nd Tx to complete the canCommit phase.
+
+ canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture, duration);
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ }
+ };
+ }
+
+ @Test
+ public void testAbortAfterReady() throws Exception {
+ dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ // Ready a tx.
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the AbortTransaction message.
+
+ shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, AbortTransactionReply.class);
+
+ assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
+
+ // Now send CanCommitTransaction - should fail.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ final Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
+ assertTrue("Failure type", failure instanceof IllegalStateException);
+
+ // Ready and CanCommit another and verify success.
+
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+ }
+ };
+ }
+
+ @Test
+ public void testAbortQueuedTransaction() throws Exception {
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ // Ready 3 tx's.
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ final TransactionIdentifier transactionID3 = nextTransactionId();
+ shard.tell(
+ newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1),
+ getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Abort the second tx while it's queued.
+
+ shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, AbortTransactionReply.class);
+
+ // Commit the other 2.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+
+ shard.tell(new CommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
+ }
+ };