- // Remove the entry from the cache now since the Tx will be aborted.
- removeCohortEntry(cohortEntry.getTransactionID());
- cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
+ if(sender != null) {
+ sender.tell(new Failure(e), self);
+ }
+ }
+ }
+
+ void checkForExpiredTransactions(final long timeout, final Shard shard) {
+ CohortEntry cohortEntry = getCurrentCohortEntry();
+ if(cohortEntry != null) {
+ if(cohortEntry.isExpired(timeout)) {
+ log.warn("{}: Current transaction {} has timed out after {} ms - aborting",
+ name, cohortEntry.getTransactionID(), timeout);
+
+ handleAbort(cohortEntry.getTransactionID(), null, shard);
+ }
+ }
+
+ cleanupExpiredCohortEntries();
+ }
+
+ void abortPendingTransactions(final String reason, final Shard shard) {
+ if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
+ return;
+ }
+
+ List<CohortEntry> cohortEntries = new ArrayList<>();
+
+ if(currentCohortEntry != null) {
+ cohortEntries.add(currentCohortEntry);
+ currentCohortEntry = null;
+ }
+
+ cohortEntries.addAll(queuedCohortEntries);
+ queuedCohortEntries.clear();
+
+ for(CohortEntry cohortEntry: cohortEntries) {
+ if(cohortEntry.getReplySender() != null) {
+ cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
+ }