+ cohortCache.clear();
+ }
+
+ Collection<?> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
+ final Collection<VersionedExternalizableMessage> messages = new ArrayList<>();
+ for (ShardDataTreeCohort cohort : dataTree.getAndClearPendingTransactions()) {
+ CohortEntry cohortEntry = cohortCache.remove(cohort.getIdentifier());
+ if (cohortEntry == null) {
+ continue;
+ }
+
+ final Deque<BatchedModifications> newMessages = new ArrayDeque<>();
+ cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() {
+ @Override
+ protected BatchedModifications getModifications() {
+ final BatchedModifications lastBatch = newMessages.peekLast();
+
+ if (lastBatch != null && lastBatch.getModifications().size() >= maxModificationsPerBatch) {
+ return lastBatch;
+ }
+
+ // Allocate a new message
+ final BatchedModifications ret = new BatchedModifications(cohortEntry.getTransactionId(),
+ cohortEntry.getClientVersion());
+ newMessages.add(ret);
+ return ret;
+ }
+ });
+
+ final BatchedModifications last = newMessages.peekLast();
+ if (last != null) {
+ final boolean immediate = cohortEntry.isDoImmediateCommit();
+ last.setDoCommitOnReady(immediate);
+ last.setReady(cohortEntry.getParticipatingShardNames());
+ last.setTotalMessagesSent(newMessages.size());
+
+ messages.addAll(newMessages);
+
+ if (!immediate) {
+ switch (cohort.getState()) {
+ case CAN_COMMIT_COMPLETE:
+ case CAN_COMMIT_PENDING:
+ messages.add(new CanCommitTransaction(cohortEntry.getTransactionId(),
+ cohortEntry.getClientVersion()));
+ break;
+ case PRE_COMMIT_COMPLETE:
+ case PRE_COMMIT_PENDING:
+ messages.add(new CommitTransaction(cohortEntry.getTransactionId(),
+ cohortEntry.getClientVersion()));
+ break;
+ default:
+ break;
+ }
+ }
+ }