+ cohortEntry.applyModifications(batched.getModifications());
+
+ if(batched.isReady()) {
+ if(cohortEntry.getLastBatchedModificationsException() != null) {
+ cohortCache.remove(cohortEntry.getTransactionID());
+ throw cohortEntry.getLastBatchedModificationsException();
+ }
+
+ if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
+ cohortCache.remove(cohortEntry.getTransactionID());
+ throw new IllegalStateException(String.format(
+ "The total number of batched messages received %d does not match the number sent %d",
+ cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
+ }
+
+ if(!queueCohortEntry(cohortEntry, sender, shard)) {
+ return;
+ }
+
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Readying Tx {}, client version {}", name,
+ batched.getTransactionID(), batched.getVersion());
+ }
+
+ cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
+
+ if(batched.isDoCommitOnReady()) {
+ cohortEntry.setReplySender(sender);
+ cohortEntry.setShard(shard);
+ handleCanCommit(cohortEntry);
+ } else {
+ sender.tell(readyTransactionReply(shard), shard.self());
+ }
+ } else {
+ sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
+ }
+ }
+
+ /**
+ * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
+ * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
+ *
+ * @param message the ReadyLocalTransaction message to process
+ * @param sender the sender of the message
+ * @param shard the transaction's shard actor
+ */
+ void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
+ final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
+ message.getTransactionID());
+ final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort,
+ DataStoreVersions.CURRENT_VERSION);
+ cohortCache.put(message.getTransactionID(), cohortEntry);
+ cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
+
+ if(!queueCohortEntry(cohortEntry, sender, shard)) {
+ return;
+ }
+
+ log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
+
+ if (message.isDoCommitOnReady()) {
+ cohortEntry.setReplySender(sender);
+ cohortEntry.setShard(shard);
+ handleCanCommit(cohortEntry);
+ } else {
+ sender.tell(readyTransactionReply(shard), shard.self());
+ }
+ }
+
+ private void handleCanCommit(CohortEntry cohortEntry) {
+ String transactionID = cohortEntry.getTransactionID();
+
+ cohortEntry.updateLastAccessTime();
+
+ if(currentCohortEntry != null) {
+ // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
+ // queue and will get processed after all prior entries complete.
+
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
+ name, currentCohortEntry.getTransactionID(), transactionID);
+ }
+
+ return;
+ }
+
+ // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
+ // it the current entry and proceed with canCommit.
+ // Purposely checking reference equality here.
+ if(queuedCohortEntries.peek() == cohortEntry) {
+ currentCohortEntry = queuedCohortEntries.poll();
+ doCanCommit(currentCohortEntry);
+ } else {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
+ name, queuedCohortEntries.peek().getTransactionID(), transactionID);
+ }
+ }
+ }
+
+ /**
+ * This method handles the canCommit phase for a transaction.
+ *
+ * @param transactionID the ID of the transaction to canCommit
+ * @param sender the actor to which to send the response
+ * @param shard the transaction's shard actor
+ */
+ void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {