+ @Override
+ protected Optional<ActorRef> getRoleChangeNotifier() {
+ return roleChangeNotifier;
+ }
+
+ private void handleTransactionCommitTimeoutCheck() {
+ CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
+ if(cohortEntry != null) {
+ long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
+ if(elapsed > transactionCommitTimeout) {
+ LOG.warning("Current transaction {} has timed out after {} ms - aborting",
+ cohortEntry.getTransactionID(), transactionCommitTimeout);
+
+ doAbortTransaction(cohortEntry.getTransactionID(), null);
+ }
+ }
+ }
+
+ private void handleCommitTransaction(final CommitTransaction commit) {
+ final String transactionID = commit.getTransactionID();
+
+ LOG.debug("Committing transaction {}", transactionID);
+
+ // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
+ // this transaction.
+ final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
+ if(cohortEntry == null) {
+ // We're not the current Tx - the Tx was likely expired b/c it took too long in
+ // between the canCommit and commit messages.
+ IllegalStateException ex = new IllegalStateException(
+ String.format("Cannot commit transaction %s - it is not the current transaction",
+ transactionID));
+ LOG.error(ex.getMessage());
+ shardMBean.incrementFailedTransactionsCount();
+ getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
+ return;
+ }
+
+ // We perform the preCommit phase here atomically with the commit phase. This is an
+ // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
+ // coordination of preCommit across shards in case of failure but preCommit should not
+ // normally fail since we ensure only one concurrent 3-phase commit.
+
+ try {
+ // We block on the future here so we don't have to worry about possibly accessing our
+ // state on a different thread outside of our dispatcher. Also, the data store
+ // currently uses a same thread executor anyway.
+ cohortEntry.getCohort().preCommit().get();
+
+ Shard.this.persistData(getSender(), transactionID,
+ new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error(e, "An exception occurred while preCommitting transaction {}",
+ cohortEntry.getTransactionID());
+ shardMBean.incrementFailedTransactionsCount();
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
+
+ cohortEntry.updateLastAccessTime();
+ }
+
+ private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
+ // With persistence enabled, this method is called via applyState by the leader strategy
+ // after the commit has been replicated to a majority of the followers.
+
+ CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
+ if(cohortEntry == null) {
+ // The transaction is no longer the current commit. This can happen if the transaction
+ // was aborted prior, most likely due to timeout in the front-end. We need to finish
+ // committing the transaction though since it was successfully persisted and replicated
+ // however we can't use the original cohort b/c it was already preCommitted and may
+ // conflict with the current commit or may have been aborted so we commit with a new
+ // transaction.
+ cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
+ if(cohortEntry != null) {
+ commitWithNewTransaction(cohortEntry.getModification());
+ sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+ } else {
+ // This really shouldn't happen - it likely means that persistence or replication
+ // took so long to complete such that the cohort entry was expired from the cache.
+ IllegalStateException ex = new IllegalStateException(
+ String.format("Could not finish committing transaction %s - no CohortEntry found",
+ transactionID));
+ LOG.error(ex.getMessage());
+ sender.tell(new akka.actor.Status.Failure(ex), getSelf());
+ }
+
+ return;
+ }
+
+ LOG.debug("Finishing commit for transaction {}", cohortEntry.getTransactionID());
+
+ try {
+ // We block on the future here so we don't have to worry about possibly accessing our
+ // state on a different thread outside of our dispatcher. Also, the data store
+ // currently uses a same thread executor anyway.
+ cohortEntry.getCohort().commit().get();
+
+ sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+
+ shardMBean.incrementCommittedTransactionCount();
+ shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
+
+ } catch (InterruptedException | ExecutionException e) {
+ sender.tell(new akka.actor.Status.Failure(e), getSelf());
+
+ LOG.error(e, "An exception occurred while committing transaction {}", transactionID);
+ shardMBean.incrementFailedTransactionsCount();
+ }
+
+ commitCoordinator.currentTransactionComplete(transactionID, true);
+ }
+
+ private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
+ LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
+ commitCoordinator.handleCanCommit(canCommit, getSender(), self());
+ }