- // Dequeue the next cohort entry waiting in the queue.
- currentCohortEntry = queuedCohortEntries.poll();
- if(currentCohortEntry != null) {
- currentCohortEntry.updateLastAccessTime();
- doCanCommit(currentCohortEntry);
+ currentCohortEntry = null;
+
+ log.debug("{}: currentTransactionComplete: {}", name, transactionID);
+
+ maybeProcessNextCohortEntry();
+ }
+ }
+
+ private void maybeProcessNextCohortEntry() {
+ // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
+ // clean out expired entries.
+ Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
+ while(iter.hasNext()) {
+ CohortEntry next = iter.next();
+ if(next.isReadyToCommit()) {
+ if(currentCohortEntry == null) {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Next entry to canCommit {}", name, next);
+ }
+
+ iter.remove();
+ currentCohortEntry = next;
+ currentCohortEntry.updateLastAccessTime();
+ doCanCommit(currentCohortEntry);
+ }
+
+ break;
+ } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
+ log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
+ name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
+ } else if(!next.isAborted()) {
+ break;