+ cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
+
+ if(batched.isDoCommitOnReady()) {
+ cohortEntry.setReplySender(sender);
+ cohortEntry.setShard(shard);
+ handleCanCommit(cohortEntry);
+ } else {
+ sender.tell(readyTransactionReply(shard), shard.self());
+ }
+ } else {
+ sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
+ }
+ }
+
+ /**
+ * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
+ * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
+ *
+ * @param message
+ * @param sender
+ * @param shard
+ */
+ void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
+ final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification());
+ final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
+ cohortCache.put(message.getTransactionID(), cohortEntry);
+ cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
+
+ if(!queueCohortEntry(cohortEntry, sender, shard)) {
+ return;
+ }
+
+ log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
+
+ if (message.isDoCommitOnReady()) {
+ cohortEntry.setReplySender(sender);
+ cohortEntry.setShard(shard);
+ handleCanCommit(cohortEntry);
+ } else {
+ sender.tell(readyTransactionReply(shard), shard.self());
+ }
+ }
+
+ private void handleCanCommit(CohortEntry cohortEntry) {
+ String transactionID = cohortEntry.getTransactionID();
+
+ cohortEntry.updateLastAccessTime();
+
+ if(currentCohortEntry != null) {
+ // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
+ // queue and will get processed after all prior entries complete.
+
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
+ name, currentCohortEntry.getTransactionID(), transactionID);
+ }
+
+ return;