Shard.this.persistData(getSender(), transactionID,
new ModificationPayload(cohortEntry.getModification()));
}
- } catch (InterruptedException | ExecutionException | IOException e) {
+ } catch (Exception e) {
LOG.error(e, "{} An exception occurred while preCommitting transaction {}",
persistenceId(), cohortEntry.getTransactionID());
shardMBean.incrementFailedTransactionsCount();
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
- } catch (InterruptedException | ExecutionException e) {
+ } catch (Exception e) {
sender.tell(new akka.actor.Status.Failure(e), getSelf());
LOG.error(e, "{}, An exception occurred while committing transaction {}", persistenceId(), transactionID);
shardMBean.incrementFailedTransactionsCount();
+ } finally {
+ commitCoordinator.currentTransactionComplete(transactionID, true);
}
-
- commitCoordinator.currentTransactionComplete(transactionID, true);
}
private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {