- private void doCanCommit(final CohortEntry cohortEntry) {
-
- boolean canCommit = false;
- try {
- // We block on the future here so we don't have to worry about possibly accessing our
- // state on a different thread outside of our dispatcher. Also, the data store
- // currently uses a same thread executor anyway.
- canCommit = cohortEntry.getCohort().canCommit().get();
-
- if(cohortEntry.isDoImmediateCommit()) {
- if(canCommit) {
- doCommit(cohortEntry);
- } else {
- cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException(
- "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
- }
- } else {
- cohortEntry.getReplySender().tell(
- canCommit ? CanCommitTransactionReply.YES.toSerializable() :
- CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
- }
- } catch (Exception e) {
- log.debug("{}: An exception occurred during canCommit: {}", name, e);
-
- Throwable failure = e;
- if(e instanceof ExecutionException) {
- failure = e.getCause();
- }
-
- cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self());
- } finally {
- if(!canCommit) {
- // Remove the entry from the cache now.
- currentTransactionComplete(cohortEntry.getTransactionID(), true);
- }
- }
- }
-
- private boolean doCommit(CohortEntry cohortEntry) {
- log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
-
- boolean success = false;