log.debug("{}: Readying transaction {}, client version {}", name,
ready.getTransactionId(), ready.getTxnClientVersion());
log.debug("{}: Readying transaction {}, client version {}", name,
ready.getTransactionId(), ready.getTxnClientVersion());
final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion());
cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion());
cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
*/
void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) {
CohortEntry cohortEntry = cohortCache.get(batched.getTransactionId());
*/
void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) {
CohortEntry cohortEntry = cohortCache.get(batched.getTransactionId());
cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionId()),
batched.getVersion());
cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionId()),
batched.getVersion());
cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
*/
void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
final TransactionIdentifier txId = message.getTransactionId();
*/
void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
final TransactionIdentifier txId = message.getTransactionId();
- final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification());
+ final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification(),
+ message.getParticipatingShardNames());
final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION);
cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION);
cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
BatchedModifications last = newModifications.getLast();
last.setDoCommitOnReady(from.isDoCommitOnReady());
BatchedModifications last = newModifications.getLast();
last.setDoCommitOnReady(from.isDoCommitOnReady());
- log.debug("{}: An exception occurred during canCommit for {}: {}", name,
- cohortEntry.getTransactionId(), failure);
+ log.debug("{}: An exception occurred during canCommit for {}", name, cohortEntry.getTransactionId(),
+ failure);
cohortCache.remove(cohortEntry.getTransactionId());
cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
cohortCache.remove(cohortEntry.getTransactionId());
cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
// between canCommit and ready and the entry was expired from the cache or it was aborted.
IllegalStateException ex = new IllegalStateException(
String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID));
// between canCommit and ready and the entry was expired from the cache or it was aborted.
IllegalStateException ex = new IllegalStateException(
String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID));
// or it was aborted.
IllegalStateException ex = new IllegalStateException(
String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID));
// or it was aborted.
IllegalStateException ex = new IllegalStateException(
String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID));
- Iterator<CohortEntry> iter = cohortCache.values().iterator();
- while (iter.hasNext()) {
- CohortEntry cohortEntry = iter.next();
- if (cohortEntry.isFailed()) {
- iter.remove();
- }
- }
+ cohortCache.values().removeIf(CohortEntry::isFailed);