- // We block on the future here so we don't have to worry about possibly accessing our
- // state on a different thread outside of our dispatcher. Also, the data store
- // currently uses a same thread executor anyway.
- cohortEntry.getCohort().preCommit().get();
-
- // If we do not have any followers and we are not using persistence
- // or if cohortEntry has no modifications
- // we can apply modification to the state immediately
- if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
- applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
- } else {
- Shard.this.persistData(getSender(), transactionID,
- new ModificationPayload(cohortEntry.getModification()));
- }
- } catch (Exception e) {
- LOG.error("{} An exception occurred while preCommitting transaction {}",
- persistenceId(), cohortEntry.getTransactionID(), e);
- shardMBean.incrementFailedTransactionsCount();
- getSender().tell(new akka.actor.Status.Failure(e), getSelf());
- }
-
- cohortEntry.updateLastAccessTime();
- }
-
- private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
- // With persistence enabled, this method is called via applyState by the leader strategy
- // after the commit has been replicated to a majority of the followers.
-
- CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
- if(cohortEntry == null) {
- // The transaction is no longer the current commit. This can happen if the transaction
- // was aborted prior, most likely due to timeout in the front-end. We need to finish
- // committing the transaction though since it was successfully persisted and replicated
- // however we can't use the original cohort b/c it was already preCommitted and may
- // conflict with the current commit or may have been aborted so we commit with a new
- // transaction.
- cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
- if(cohortEntry != null) {
- commitWithNewTransaction(cohortEntry.getModification());
- sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
- } else {
- // This really shouldn't happen - it likely means that persistence or replication
- // took so long to complete such that the cohort entry was expired from the cache.
- IllegalStateException ex = new IllegalStateException(
- String.format("%s: Could not finish committing transaction %s - no CohortEntry found",
- persistenceId(), transactionID));
- LOG.error(ex.getMessage());
- sender.tell(new akka.actor.Status.Failure(ex), getSelf());
+ if (!isLeader() || !isLeaderActive()) {
+ LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ + "isLeadershipTransferInProgress: {}.",
+ persistenceId(), message, isLeader(), isLeaderActive(), isLeadershipTransferInProgress());
+ throw new NotLeaderException(getSelf());