+ log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
+
+ if (message.isDoCommitOnReady()) {
+ cohortEntry.setReplySender(sender);
+ cohortEntry.setShard(shard);
+ handleCanCommit(cohortEntry);
+ } else {
+ sender.tell(readyTransactionReply(shard), shard.self());
+ }
+ }
+
+ Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
+ final int maxModificationsPerBatch) {
+ CohortEntry cohortEntry = getAndRemoveCohortEntry(new StringIdentifier(from.getTransactionID()));
+ if(cohortEntry == null || cohortEntry.getTransaction() == null) {
+ return Collections.singletonList(from);
+ }
+
+ cohortEntry.applyModifications(from.getModifications());
+
+ final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
+ cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() {
+ @Override
+ protected BatchedModifications getModifications() {
+ if(newModifications.isEmpty() ||
+ newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
+ newModifications.add(new BatchedModifications(from.getTransactionID(),
+ from.getVersion(), from.getTransactionChainID()));
+ }
+
+ return newModifications.getLast();
+ }
+ });
+
+ BatchedModifications last = newModifications.getLast();
+ last.setDoCommitOnReady(from.isDoCommitOnReady());
+ last.setReady(from.isReady());
+ last.setTotalMessagesSent(newModifications.size());
+ return newModifications;
+ }
+
+ private void handleCanCommit(CohortEntry cohortEntry) {
+ cohortEntry.updateLastAccessTime();
+
+ if(currentCohortEntry != null) {
+ // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
+ // queue and will get processed after all prior entries complete.
+
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
+ name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID());
+ }
+
+ return;
+ }
+
+ // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
+ // it the current entry and proceed with canCommit.
+ // Purposely checking reference equality here.
+ if(queuedCohortEntries.peek() == cohortEntry) {
+ currentCohortEntry = queuedCohortEntries.poll();
+ doCanCommit(currentCohortEntry);
+ } else {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
+ queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
+ cohortEntry.getTransactionID());
+ }
+ }
+ }
+
+ /**
+ * This method handles the canCommit phase for a transaction.
+ *
+ * @param transactionID the ID of the transaction to canCommit
+ * @param sender the actor to which to send the response
+ * @param shard the transaction's shard actor
+ */
+ void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) {