+ log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
+
+ if (message.isDoCommitOnReady()) {
+ cohortEntry.setReplySender(sender);
+ cohortEntry.setShard(shard);
+ handleCanCommit(cohortEntry);
+ } else {
+ sender.tell(readyTransactionReply(shard), shard.self());
+ }
+ }
+
+ private void handleCanCommit(CohortEntry cohortEntry) {
+ String transactionID = cohortEntry.getTransactionID();
+
+ cohortEntry.updateLastAccessTime();
+
+ if(currentCohortEntry != null) {
+ // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
+ // queue and will get processed after all prior entries complete.
+
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
+ name, currentCohortEntry.getTransactionID(), transactionID);
+ }
+
+ return;
+ }
+
+ // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
+ // it the current entry and proceed with canCommit.
+ // Purposely checking reference equality here.
+ if(queuedCohortEntries.peek() == cohortEntry) {
+ currentCohortEntry = queuedCohortEntries.poll();
+ doCanCommit(currentCohortEntry);
+ } else {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
+ name, queuedCohortEntries.peek().getTransactionID(), transactionID);
+ }
+ }
+ }
+
+ /**
+ * This method handles the canCommit phase for a transaction.
+ *
+ * @param transactionID the ID of the transaction to canCommit
+ * @param sender the actor to which to send the response
+ * @param shard the transaction's shard actor
+ */
+ void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {