+ @Test
+ public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
+ dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
+
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testTransactionCommitWithPriorExpiredCohortEntries");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+ final String transactionID1 = "tx1";
+ final MutableCompositeModification modification1 = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+ TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ final String transactionID2 = "tx2";
+ final MutableCompositeModification modification2 = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+ TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
+
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ final String transactionID3 = "tx3";
+ final MutableCompositeModification modification3 = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+ TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
+
+ shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+ cohort3, modification3, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
+ // should expire from the queue and the last one should be processed.
+
+ shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
+ dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
+
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testTransactionCommitWithSubsequentExpiredCohortEntry");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+ final String transactionID1 = "tx1";
+ final MutableCompositeModification modification1 = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+ TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // CanCommit the first one so it's the current in-progress CohortEntry.
+
+ shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+ // Ready the second Tx.
+
+ final String transactionID2 = "tx2";
+ final MutableCompositeModification modification2 = new MutableCompositeModification();
+ final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+ TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
+
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Ready the third Tx.
+
+ final String transactionID3 = "tx3";
+ final DataTreeModification modification3 = dataStore.newModification();
+ new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
+ .apply(modification3);
+ modification3.ready();
+ final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
+
+ shard.tell(readyMessage, getRef());
+
+ // Commit the first Tx. After completing, the second should expire from the queue and the third
+ // Tx committed.
+
+ shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+ // Expect commit reply from the third Tx.
+
+ expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+ final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
+ assertNotNull(TestModel.TEST2_PATH + " not found", node);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+