- public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
- final ActorRef shard) {
- String transactionID = canCommit.getTransactionID();
- if(log.isDebugEnabled()) {
- log.debug("{}: Processing canCommit for transaction {} for shard {}",
- name, transactionID, shard.path());
+ void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
+ final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
+ message.getTransactionID());
+ final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
+ cohortCache.put(message.getTransactionID(), cohortEntry);
+ cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
+
+ if(!queueCohortEntry(cohortEntry, sender, shard)) {
+ return;
+ }
+
+ log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
+
+ if (message.isDoCommitOnReady()) {
+ cohortEntry.setReplySender(sender);
+ cohortEntry.setShard(shard);
+ handleCanCommit(cohortEntry);
+ } else {
+ sender.tell(readyTransactionReply(shard), shard.self());
+ }
+ }
+
+ private void handleCanCommit(CohortEntry cohortEntry) {
+ String transactionID = cohortEntry.getTransactionID();
+
+ cohortEntry.updateLastAccessTime();
+
+ if(currentCohortEntry != null) {
+ // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
+ // queue and will get processed after all prior entries complete.
+
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
+ name, currentCohortEntry.getTransactionID(), transactionID);
+ }
+
+ return;