+ if(sender != null) {
+ sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
+ }
+ } catch (Exception e) {
+ log.error("{}: An exception happened during abort", name, e);
+
+ if(sender != null) {
+ sender.tell(new Failure(e), self);
+ }
+ }
+ }
+
+ void checkForExpiredTransactions(final long timeout, final Shard shard) {
+ CohortEntry cohortEntry = getCurrentCohortEntry();
+ if(cohortEntry != null) {
+ if(cohortEntry.isExpired(timeout)) {
+ log.warn("{}: Current transaction {} has timed out after {} ms - aborting",
+ name, cohortEntry.getTransactionID(), timeout);
+
+ handleAbort(cohortEntry.getTransactionID(), null, shard);
+ }
+ }
+
+ cleanupExpiredCohortEntries();
+ }
+
+ void abortPendingTransactions(final String reason, final Shard shard) {
+ if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
+ return;
+ }
+
+ List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
+
+ log.debug("{}: Aborting {} pending queued transactions", name, cohortEntries.size());
+
+ for(CohortEntry cohortEntry: cohortEntries) {
+ if(cohortEntry.getReplySender() != null) {
+ cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
+ }
+ }
+ }
+
+ private List<CohortEntry> getAndClearPendingCohortEntries() {
+ List<CohortEntry> cohortEntries = new ArrayList<>();
+
+ if(currentCohortEntry != null) {
+ cohortEntries.add(currentCohortEntry);
+ cohortCache.remove(currentCohortEntry.getTransactionID());
+ currentCohortEntry = null;
+ }
+
+ for(CohortEntry cohortEntry: queuedCohortEntries) {
+ cohortEntries.add(cohortEntry);
+ cohortCache.remove(cohortEntry.getTransactionID());
+ }
+
+ queuedCohortEntries.clear();
+ return cohortEntries;
+ }
+
+ Collection<Object> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
+ if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ Collection<Object> messages = new ArrayList<>();
+ List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
+ for(CohortEntry cohortEntry: cohortEntries) {
+ if(cohortEntry.isExpired(cacheExpiryTimeoutInMillis) || cohortEntry.isAborted()) {
+ continue;