- public boolean isCurrentTransaction(String transactionID) {
- return currentCohortEntry != null &&
- currentCohortEntry.getTransactionID().equals(transactionID);
+ void finishCommit(final @NonNull ActorRef sender, final @NonNull CohortEntry cohortEntry) {
+ log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId());
+
+ cohortEntry.commit(new FutureCallback<UnsignedLong>() {
+ @Override
+ public void onSuccess(final UnsignedLong result) {
+ final TransactionIdentifier txId = cohortEntry.getTransactionId();
+ log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
+ sender);
+
+ cohortCache.remove(cohortEntry.getTransactionId());
+ sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
+ cohortEntry.getShard().self());
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ final TransactionIdentifier txId = cohortEntry.getTransactionId();
+ log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure);
+
+ cohortCache.remove(cohortEntry.getTransactionId());
+ sender.tell(new Failure(failure), cohortEntry.getShard().self());
+ }
+ });