+ void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) {
+ CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
+ if(cohortEntry != null) {
+ // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
+ // aborted during replication in which case we may still commit locally if replication
+ // succeeds.
+ currentTransactionComplete(transactionID, false);
+ } else {
+ cohortEntry = getAndRemoveCohortEntry(transactionID);
+ }
+
+ if(cohortEntry == null) {
+ return;
+ }
+
+ log.debug("{}: Aborting transaction {}", name, transactionID);
+
+ final ActorRef self = shard.getSelf();
+ try {
+ cohortEntry.abort();
+
+ shard.getShardMBean().incrementAbortTransactionsCount();
+
+ if(sender != null) {
+ sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
+ }
+ } catch (Exception e) {
+ log.error("{}: An exception happened during abort", name, e);
+
+ if(sender != null) {
+ sender.tell(new Failure(e), self);
+ }
+ }
+ }
+
+ void checkForExpiredTransactions(final long timeout, final Shard shard) {
+ CohortEntry cohortEntry = getCurrentCohortEntry();
+ if(cohortEntry != null) {
+ if(cohortEntry.isExpired(timeout)) {
+ log.warn("{}: Current transaction {} has timed out after {} ms - aborting",
+ name, cohortEntry.getTransactionID(), timeout);
+
+ handleAbort(cohortEntry.getTransactionID(), null, shard);
+ }
+ }
+
+ cleanupExpiredCohortEntries();
+ }
+
+ void abortPendingTransactions(final String reason, final Shard shard) {
+ if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
+ return;
+ }
+
+ List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
+
+ log.debug("{}: Aborting {} pending queued transactions", name, cohortEntries.size());
+
+ for(CohortEntry cohortEntry: cohortEntries) {
+ if(cohortEntry.getReplySender() != null) {
+ cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
+ }
+ }
+ }
+
+ private List<CohortEntry> getAndClearPendingCohortEntries() {
+ List<CohortEntry> cohortEntries = new ArrayList<>();
+
+ if(currentCohortEntry != null) {
+ cohortEntries.add(currentCohortEntry);
+ cohortCache.remove(currentCohortEntry.getTransactionID());
+ currentCohortEntry = null;
+ }
+
+ for(CohortEntry cohortEntry: queuedCohortEntries) {
+ cohortEntries.add(cohortEntry);
+ cohortCache.remove(cohortEntry.getTransactionID());
+ }
+
+ queuedCohortEntries.clear();
+ return cohortEntries;
+ }
+
+ Collection<Object> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
+ if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ Collection<Object> messages = new ArrayList<>();
+ List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
+ for(CohortEntry cohortEntry: cohortEntries) {
+ if(cohortEntry.isExpired(cacheExpiryTimeoutInMillis) || cohortEntry.isAborted()) {
+ continue;
+ }
+
+ final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
+ cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() {
+ @Override
+ protected BatchedModifications getModifications() {
+ if(newModifications.isEmpty() ||
+ newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
+ newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
+ cohortEntry.getClientVersion()));
+ }
+
+ return newModifications.getLast();
+ }
+ });
+
+ if(!newModifications.isEmpty()) {
+ BatchedModifications last = newModifications.getLast();
+ last.setDoCommitOnReady(cohortEntry.isDoImmediateCommit());
+ last.setReady(true);
+ last.setTotalMessagesSent(newModifications.size());
+ messages.addAll(newModifications);
+
+ if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.CAN_COMMITTED) {
+ messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
+ cohortEntry.getClientVersion()));
+ }
+
+ if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.PRE_COMMITTED) {
+ messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
+ cohortEntry.getClientVersion()));
+ }
+ }
+ }
+
+ return messages;
+ }
+