+ processNextPendingTransaction();
+ }
+
+ private void failPreCommit(final Exception cause) {
+ shard.getShardMBean().incrementFailedTransactionsCount();
+ pendingTransactions.poll().cohort.failedPreCommit(cause);
+ processNextPendingTransaction();
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ void startPreCommit(final SimpleShardDataTreeCohort cohort) {
+ final CommitEntry entry = pendingTransactions.peek();
+ Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
+
+ final SimpleShardDataTreeCohort current = entry.cohort;
+ Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
+
+ LOG.debug("{}: Preparing transaction {}", logContext, current.getIdentifier());
+
+ final DataTreeCandidateTip candidate;
+ try {
+ candidate = tip.prepare(cohort.getDataTreeModification());
+ cohort.userPreCommit(candidate);
+ } catch (ExecutionException | TimeoutException | RuntimeException e) {
+ failPreCommit(e);
+ return;
+ }
+
+ // Set the tip of the data tree.
+ tip = Verify.verifyNotNull(candidate);
+
+ entry.lastAccess = readTime();
+
+ pendingTransactions.remove();
+ pendingCommits.add(entry);
+
+ LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
+
+ cohort.successfulPreCommit(candidate);
+
+ processNextPendingTransaction();
+ }
+
+ private void failCommit(final Exception cause) {
+ shard.getShardMBean().incrementFailedTransactionsCount();
+ pendingFinishCommits.poll().cohort.failedCommit(cause);
+ processNextPending();
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void finishCommit(final SimpleShardDataTreeCohort cohort) {
+ final TransactionIdentifier txId = cohort.getIdentifier();
+ final DataTreeCandidate candidate = cohort.getCandidate();
+
+ LOG.debug("{}: Resuming commit of transaction {}", logContext, txId);
+
+ if (tip == candidate) {
+ // All pending candidates have been committed, reset the tip to the data tree.
+ tip = dataTree;
+ }
+
+ try {
+ dataTree.commit(candidate);
+ } catch (Exception e) {
+ LOG.error("{}: Failed to commit transaction {}", logContext, txId, e);
+ failCommit(e);
+ return;
+ }
+
+ shard.getShardMBean().incrementCommittedTransactionCount();
+ shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
+
+ // FIXME: propagate journal index
+ pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO);
+
+ LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
+ notifyListeners(candidate);
+
+ processNextPending();
+ }
+
+ void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
+ final CommitEntry entry = pendingCommits.peek();
+ Preconditions.checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
+
+ final SimpleShardDataTreeCohort current = entry.cohort;
+ if (!cohort.equals(current)) {
+ LOG.debug("{}: Transaction {} scheduled for commit step", logContext, cohort.getIdentifier());
+ return;
+ }
+
+ LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier());
+
+ final TransactionIdentifier txId = cohort.getIdentifier();
+ final Payload payload;
+ try {
+ payload = CommitTransactionPayload.create(txId, candidate);
+ } catch (IOException e) {
+ LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
+ pendingCommits.poll().cohort.failedCommit(e);
+ processNextPending();
+ return;
+ }
+
+ // We process next transactions pending canCommit before we call persistPayload to possibly progress subsequent
+ // transactions to the COMMIT_PENDING state so the payloads can be batched for replication. This is done for
+ // single-shard transactions that immediately transition from canCommit to preCommit to commit. Note that
+ // if the next pending transaction is progressed to COMMIT_PENDING and this method (startCommit) is called,
+ // the next transaction will not attempt to replicate b/c the current transaction is still at the head of the
+ // pendingCommits queue.
+ processNextPendingTransaction();
+
+ // After processing next pending transactions, we can now remove the current transaction from pendingCommits.
+ // Note this must be done before the call to peekNextPendingCommit below so we check the next transaction
+ // in order to properly determine the batchHint flag for the call to persistPayload.
+ pendingCommits.remove();
+ pendingFinishCommits.add(entry);
+
+ // See if the next transaction is pending commit (ie in the COMMIT_PENDING state) so it can be batched with
+ // this transaction for replication.
+ boolean replicationBatchHint = peekNextPendingCommit();
+
+ // Once completed, we will continue via payloadReplicationComplete
+ shard.persistPayload(txId, payload, replicationBatchHint);
+
+ entry.lastAccess = shard.ticker().read();
+
+ LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
+
+ // Process the next transaction pending commit, if any. If there is one it will be batched with this
+ // transaction for replication.
+ processNextPendingCommit();
+ }
+
+ Collection<ActorRef> getCohortActors() {
+ return cohortRegistry.getCohortActors();
+ }
+
+ void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
+ cohortRegistry.process(sender, message);
+ }
+
+ @Override
+ ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+ final Exception failure) {
+ final SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, failure);
+ pendingTransactions.add(new CommitEntry(cohort, readTime()));
+ return cohort;
+ }
+
+ @Override
+ ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+ SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
+ cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
+ pendingTransactions.add(new CommitEntry(cohort, readTime()));
+ return cohort;
+ }
+
+ // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
+ // the newReadWriteTransaction()
+ ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+ if (txId.getHistoryId().getHistoryId() == 0) {
+ return createReadyCohort(txId, mod);
+ }
+
+ return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod);
+ }
+
+ @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
+ void checkForExpiredTransactions(final long transactionCommitTimeoutMillis,
+ final Function<SimpleShardDataTreeCohort, Optional<Long>> accessTimeUpdater) {
+ final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
+ final long now = readTime();
+
+ final Queue<CommitEntry> currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits :
+ !pendingCommits.isEmpty() ? pendingCommits : pendingTransactions;
+ final CommitEntry currentTx = currentQueue.peek();
+ if (currentTx == null) {
+ // Empty queue, no-op
+ return;
+ }
+
+ long delta = now - currentTx.lastAccess;
+ if (delta < timeout) {
+ // Not expired yet, bail
+ return;
+ }
+
+ final Optional<Long> updateOpt = accessTimeUpdater.apply(currentTx.cohort);
+ if (updateOpt.isPresent()) {
+ final long newAccess = updateOpt.get().longValue();
+ final long newDelta = now - newAccess;
+ if (newDelta < delta) {
+ LOG.debug("{}: Updated current transaction {} access time", logContext,
+ currentTx.cohort.getIdentifier());
+ currentTx.lastAccess = newAccess;
+ delta = newDelta;
+ }
+
+ if (delta < timeout) {
+ // Not expired yet, bail
+ return;
+ }
+ }
+
+ final long deltaMillis = TimeUnit.NANOSECONDS.toMillis(delta);
+ final State state = currentTx.cohort.getState();
+
+ LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
+ currentTx.cohort.getIdentifier(), deltaMillis, state);
+ boolean processNext = true;
+ final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after "
+ + deltaMillis + "ms");
+
+ switch (state) {
+ case CAN_COMMIT_PENDING:
+ currentQueue.remove().cohort.failedCanCommit(cohortFailure);
+ break;
+ case CAN_COMMIT_COMPLETE:
+ // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
+ // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
+ // in PRE_COMMIT_COMPLETE is changed.
+ currentQueue.remove().cohort.reportFailure(cohortFailure);
+ break;
+ case PRE_COMMIT_PENDING:
+ currentQueue.remove().cohort.failedPreCommit(cohortFailure);
+ break;
+ case PRE_COMMIT_COMPLETE:
+ // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
+ // are ready we should commit the transaction, not abort it. Our current software stack does
+ // not allow us to do that consistently, because we persist at the time of commit, hence
+ // we can end up in a state where we have pre-committed a transaction, then a leader failover
+ // occurred ... the new leader does not see the pre-committed transaction and does not have
+ // a running timer. To fix this we really need two persistence events.
+ //
+ // The first one, done at pre-commit time will hold the transaction payload. When consensus
+ // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not
+ // apply the state in this event.
+ //
+ // The second one, done at commit (or abort) time holds only the transaction identifier and
+ // signals to followers that the state should (or should not) be applied.
+ //
+ // In order to make the pre-commit timer working across failovers, though, we need
+ // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
+ // restart the timer.
+ currentQueue.remove().cohort.reportFailure(cohortFailure);
+ break;
+ case COMMIT_PENDING:
+ LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
+ currentTx.cohort.getIdentifier());
+ currentTx.lastAccess = now;
+ processNext = false;
+ return;
+ case READY:
+ currentQueue.remove().cohort.reportFailure(cohortFailure);
+ break;
+ case ABORTED:
+ case COMMITTED:
+ case FAILED:
+ default:
+ currentQueue.remove();
+ }
+
+ if (processNext) {
+ processNextPending();
+ }
+ }
+
+ boolean startAbort(final SimpleShardDataTreeCohort cohort) {
+ final Iterator<CommitEntry> it = Iterables.concat(pendingFinishCommits, pendingCommits,
+ pendingTransactions).iterator();
+ if (!it.hasNext()) {
+ LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
+ return true;
+ }
+
+ // First entry is special, as it may already be committing
+ final CommitEntry first = it.next();
+ if (cohort.equals(first.cohort)) {
+ if (cohort.getState() != State.COMMIT_PENDING) {
+ LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.getIdentifier(),
+ cohort.getIdentifier());
+
+ it.remove();
+ if (cohort.getCandidate() != null) {
+ rebaseTransactions(it, dataTree);
+ }
+
+ processNextPending();
+ return true;
+ }
+
+ LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+ return false;
+ }
+
+ TipProducingDataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree);
+ while (it.hasNext()) {
+ final CommitEntry e = it.next();
+ if (cohort.equals(e.cohort)) {
+ LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
+
+ it.remove();
+ if (cohort.getCandidate() != null) {
+ rebaseTransactions(it, newTip);
+ }
+
+ return true;
+ } else {
+ newTip = MoreObjects.firstNonNull(e.cohort.getCandidate(), newTip);
+ }
+ }
+
+ LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
+ return true;
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void rebaseTransactions(final Iterator<CommitEntry> iter, @Nonnull final TipProducingDataTreeTip newTip) {
+ tip = Preconditions.checkNotNull(newTip);
+ while (iter.hasNext()) {
+ final SimpleShardDataTreeCohort cohort = iter.next().cohort;
+ if (cohort.getState() == State.CAN_COMMIT_COMPLETE) {
+ LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.getIdentifier());
+
+ try {
+ tip.validate(cohort.getDataTreeModification());
+ } catch (DataValidationFailedException | RuntimeException e) {
+ LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.getIdentifier(), e);
+ cohort.reportFailure(e);
+ }
+ } else if (cohort.getState() == State.PRE_COMMIT_COMPLETE) {
+ LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.getIdentifier());
+
+ try {
+ tip.validate(cohort.getDataTreeModification());
+ DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification());
+ cohort.userPreCommit(candidate);
+
+ cohort.setNewCandidate(candidate);
+ tip = candidate;
+ } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) {
+ LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
+ cohort.reportFailure(e);
+ }
+ }
+ }
+ }
+
+ void setRunOnPendingTransactionsComplete(final Runnable operation) {
+ runOnPendingTransactionsComplete = operation;
+ maybeRunOperationOnPendingTransactionsComplete();
+ }
+
+ private void maybeRunOperationOnPendingTransactionsComplete() {
+ if (runOnPendingTransactionsComplete != null && !anyPendingTransactions()) {
+ LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
+ runOnPendingTransactionsComplete);
+
+ runOnPendingTransactionsComplete.run();
+ runOnPendingTransactionsComplete = null;
+ }
+ }
+
+ ShardStats getStats() {
+ return shard.getShardMBean();
+ }
+
+ Iterator<SimpleShardDataTreeCohort> cohortIterator() {
+ return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions),
+ e -> e.cohort).iterator();
+ }
+
+ void removeTransactionChain(final LocalHistoryIdentifier id) {
+ if (transactionChains.remove(id) != null) {
+ LOG.debug("{}: Removed transaction chain {}", logContext, id);