import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import javax.annotation.Nonnull;
@Override
public void onFailure(final Throwable failure) {
- log.debug("{}: An exception occurred during canCommit for {}: {}", name,
- cohortEntry.getTransactionId(), failure);
+ log.debug("{}: An exception occurred during canCommit for {}", name, cohortEntry.getTransactionId(),
+ failure);
cohortCache.remove(cohortEntry.getTransactionId());
cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
// between canCommit and ready and the entry was expired from the cache or it was aborted.
IllegalStateException ex = new IllegalStateException(
String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID));
- log.error(ex.getMessage());
+ log.error("{}: Inconsistency during transaction {} canCommit", name, transactionID, ex);
sender.tell(new Failure(ex), shard.self());
return;
}
// or it was aborted.
IllegalStateException ex = new IllegalStateException(
String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID));
- log.error(ex.getMessage());
+ log.error("{}: Inconsistency during transaction {} commit", name, transactionID, ex);
sender.tell(new Failure(ex), shard.self());
return;
}
}
void checkForExpiredTransactions(final long timeout, final Shard shard) {
- Iterator<CohortEntry> iter = cohortCache.values().iterator();
- while (iter.hasNext()) {
- CohortEntry cohortEntry = iter.next();
- if (cohortEntry.isFailed()) {
- iter.remove();
- }
- }
+ cohortCache.values().removeIf(CohortEntry::isFailed);
}
void abortPendingTransactions(final String reason, final Shard shard) {