- last.setTotalMessagesSent(newModifications.size());
- messages.addAll(newModifications);
-
- if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.CAN_COMMITTED) {
- messages.add(new CanCommitTransaction(cohortEntry.getTransactionID().getString(),
- cohortEntry.getClientVersion()));
- }
-
- if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.PRE_COMMITTED) {
- messages.add(new CommitTransaction(cohortEntry.getTransactionID().getString(),
- cohortEntry.getClientVersion()));
- }
- }
- }
-
- return messages;
- }
-
- /**
- * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
- * matches the current entry.
- *
- * @param transactionID the ID of the transaction
- * @return the current CohortEntry or null if the given transaction ID does not match the
- * current entry.
- */
- CohortEntry getCohortEntryIfCurrent(Identifier transactionID) {
- if(isCurrentTransaction(transactionID)) {
- return currentCohortEntry;
- }
-
- return null;
- }
-
- CohortEntry getCurrentCohortEntry() {
- return currentCohortEntry;
- }
-
- CohortEntry getAndRemoveCohortEntry(Identifier transactionID) {
- return cohortCache.remove(transactionID);
- }
-
- boolean isCurrentTransaction(Identifier transactionID) {
- return currentCohortEntry != null &&
- currentCohortEntry.getTransactionID().equals(transactionID);
- }
-
- /**
- * This method is called when a transaction is complete, successful or not. If the given
- * given transaction ID matches the current in-progress transaction, the next cohort entry,
- * if any, is dequeued and processed.
- *
- * @param transactionID the ID of the completed transaction
- * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
- * the cache.
- */
- void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) {
- if(removeCohortEntry) {
- cohortCache.remove(transactionID);
- }
-
- if(isCurrentTransaction(transactionID)) {
- currentCohortEntry = null;
-
- log.debug("{}: currentTransactionComplete: {}", name, transactionID);
-
- maybeProcessNextCohortEntry();
- }
- }
-
- private void maybeProcessNextCohortEntry() {
- // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
- // clean out expired entries.
- final Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
- while(iter.hasNext()) {
- final CohortEntry next = iter.next();
- if(next.isReadyToCommit()) {
- if(currentCohortEntry == null) {
- if(log.isDebugEnabled()) {
- log.debug("{}: Next entry to canCommit {}", name, next);
+ last.setTotalMessagesSent(newMessages.size());
+
+ messages.addAll(newMessages);
+
+ if (!immediate) {
+ switch (cohort.getState()) {
+ case CAN_COMMIT_COMPLETE:
+ case CAN_COMMIT_PENDING:
+ messages.add(new CanCommitTransaction(cohortEntry.getTransactionId(),
+ cohortEntry.getClientVersion()));
+ break;
+ case PRE_COMMIT_COMPLETE:
+ case PRE_COMMIT_PENDING:
+ messages.add(new CommitTransaction(cohortEntry.getTransactionId(),
+ cohortEntry.getClientVersion()));
+ break;
+ default:
+ break;