log.debug("{}: Readying transaction {}, client version {}", name,
ready.getTransactionId(), ready.getTxnClientVersion());
- final ShardDataTreeCohort cohort = ready.getTransaction().ready();
+ final ShardDataTreeCohort cohort = ready.getTransaction().ready(ready.getParticipatingShardNames());
final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion());
cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
}
cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady());
- cohortEntry.ready(cohortDecorator);
+ cohortEntry.ready(batched.getParticipatingShardNames(), cohortDecorator);
if (batched.isDoCommitOnReady()) {
cohortEntry.setReplySender(sender);
*/
void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
final TransactionIdentifier txId = message.getTransactionId();
- final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification());
+ final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification(),
+ message.getParticipatingShardNames());
final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION);
cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
BatchedModifications last = newModifications.getLast();
last.setDoCommitOnReady(from.isDoCommitOnReady());
- last.setReady(from.isReady());
+ if (from.isReady()) {
+ last.setReady(from.getParticipatingShardNames());
+ }
last.setTotalMessagesSent(newModifications.size());
return newModifications;
}
if (last != null) {
final boolean immediate = cohortEntry.isDoImmediateCommit();
last.setDoCommitOnReady(immediate);
- last.setReady(true);
+ last.setReady(cohortEntry.getParticipatingShardNames());
last.setTotalMessagesSent(newMessages.size());
messages.addAll(newMessages);