+ void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) {
+ log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId());
+
+ cohortEntry.commit(new FutureCallback<UnsignedLong>() {
+ @Override
+ public void onSuccess(final UnsignedLong result) {
+ final TransactionIdentifier txId = cohortEntry.getTransactionId();
+ log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
+ sender);
+ cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
+
+ cohortCache.remove(cohortEntry.getTransactionId());
+ sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
+ cohortEntry.getShard().self());
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ final TransactionIdentifier txId = cohortEntry.getTransactionId();
+ log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure);
+ cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
+
+ cohortCache.remove(cohortEntry.getTransactionId());
+ sender.tell(new Failure(failure), cohortEntry.getShard().self());
+ }
+ });