+ void startCanCommit(final SimpleShardDataTreeCohort cohort) {
+ final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort;
+ if (!cohort.equals(current)) {
+ LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier());
+ return;
+ }
+
+ processNextTransaction();
+ }
+
+ private void failPreCommit(final Exception cause) {
+ shard.getShardMBean().incrementFailedTransactionsCount();
+ pendingTransactions.poll().cohort.failedPreCommit(cause);
+ processNextTransaction();
+ }
+
+ void startPreCommit(final SimpleShardDataTreeCohort cohort) {
+ final CommitEntry entry = pendingTransactions.peek();
+ Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
+
+ final SimpleShardDataTreeCohort current = entry.cohort;
+ Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
+ final DataTreeCandidateTip candidate;
+ try {
+ candidate = dataTree.prepare(cohort.getDataTreeModification());
+ } catch (Exception e) {
+ failPreCommit(e);
+ return;
+ }
+
+ try {
+ cohort.userPreCommit(candidate);
+ } catch (ExecutionException | TimeoutException e) {
+ failPreCommit(e);
+ return;
+ }
+
+ entry.lastAccess = shard.ticker().read();
+ cohort.successfulPreCommit(candidate);
+ }
+
+ private void failCommit(final Exception cause) {
+ shard.getShardMBean().incrementFailedTransactionsCount();
+ pendingTransactions.poll().cohort.failedCommit(cause);
+ processNextTransaction();
+ }
+
+ private void finishCommit(final SimpleShardDataTreeCohort cohort) {
+ final TransactionIdentifier txId = cohort.getIdentifier();
+ final DataTreeCandidate candidate = cohort.getCandidate();
+
+ LOG.debug("{}: Resuming commit of transaction {}", logContext, txId);
+
+ try {
+ try {
+ dataTree.commit(candidate);
+ } catch (IllegalStateException e) {
+ // We may get a "store tree and candidate base differ" IllegalStateException from commit under
+ // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
+ // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before
+ // applying it to the state. We then become the leader and a second tx is pre-committed and
+ // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign
+ // candidate via applyState prior to the second tx. Since the second tx has already been
+ // pre-committed, when it gets here to commit it will get an IllegalStateException.
+
+ // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner
+ // solution will be forthcoming.
+
+ LOG.debug("{}: Commit failed for transaction {} - retrying as foreign candidate", logContext, txId, e);
+ applyForeignCandidate(txId, candidate);
+ }
+ } catch (Exception e) {
+ failCommit(e);
+ return;
+ }
+
+ shard.getShardMBean().incrementCommittedTransactionCount();
+ shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
+
+ // FIXME: propagate journal index
+
+ pendingTransactions.poll().cohort.successfulCommit(UnsignedLong.ZERO);
+
+ LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
+ notifyListeners(candidate);
+
+ processNextTransaction();
+ }
+
+ void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
+ final CommitEntry entry = pendingTransactions.peek();
+ Preconditions.checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
+
+ final SimpleShardDataTreeCohort current = entry.cohort;
+ Verify.verify(cohort.equals(current), "Attempted to commit %s while %s is pending", cohort, current);
+
+ if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) {
+ LOG.debug("{}: No replication required, proceeding to finish commit", logContext);
+ finishCommit(cohort);
+ return;
+ }
+
+ final TransactionIdentifier txId = cohort.getIdentifier();
+ final Payload payload;
+ try {
+ payload = CommitTransactionPayload.create(txId, candidate);
+ } catch (IOException e) {
+ LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
+ pendingTransactions.poll().cohort.failedCommit(e);
+ return;
+ }
+
+ // Once completed, we will continue via payloadReplicationComplete
+ entry.lastAccess = shard.ticker().read();
+ shard.persistPayload(txId, payload);
+ LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
+ }
+
+ private void payloadReplicationComplete(final TransactionIdentifier txId, final DataTreeCandidateSupplier payload) {
+ final CommitEntry current = pendingTransactions.peek();
+ if (current == null) {
+ LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+ return;
+ }
+
+ if (!current.cohort.getIdentifier().equals(txId)) {
+ LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
+ current.cohort.getIdentifier(), txId);
+ return;
+ }
+
+ finishCommit(current.cohort);
+ }
+
+ void payloadReplicationComplete(final Identifier identifier, final DataTreeCandidateSupplier payload) {
+ // For now we do not care about anything else but transactions
+ Verify.verify(identifier instanceof TransactionIdentifier);
+ payloadReplicationComplete((TransactionIdentifier)identifier, payload);
+ }
+
+ void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
+ cohortRegistry.process(sender, message);
+ }
+
+ ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
+ final DataTreeModification modification) {
+ SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
+ cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
+ pendingTransactions.add(new CommitEntry(cohort, shard.ticker().read()));
+ return cohort;
+ }
+
+ void applyStateFromLeader(final Identifier identifier, final DataTreeCandidateSupplier payload)
+ throws DataValidationFailedException, IOException {
+ applyForeignCandidate(identifier, payload.getCandidate().getValue());
+ }
+
+ void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
+ final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
+ final long now = shard.ticker().read();
+ final CommitEntry currentTx = pendingTransactions.peek();
+ if (currentTx != null && currentTx.lastAccess + timeout < now) {
+ LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
+ currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState());
+ boolean processNext = true;
+ switch (currentTx.cohort.getState()) {
+ case CAN_COMMIT_PENDING:
+ pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException());
+ break;
+ case CAN_COMMIT_COMPLETE:
+ pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
+ break;
+ case PRE_COMMIT_PENDING:
+ pendingTransactions.poll().cohort.failedPreCommit(new TimeoutException());
+ break;
+ case PRE_COMMIT_COMPLETE:
+ // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
+ // are ready we should commit the transaction, not abort it. Our current software stack does
+ // not allow us to do that consistently, because we persist at the time of commit, hence
+ // we can end up in a state where we have pre-committed a transaction, then a leader failover
+ // occurred ... the new leader does not see the pre-committed transaction and does not have
+ // a running timer. To fix this we really need two persistence events.
+ //
+ // The first one, done at pre-commit time will hold the transaction payload. When consensus
+ // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not
+ // apply the state in this event.
+ //
+ // The second one, done at commit (or abort) time holds only the transaction identifier and
+ // signals to followers that the state should (or should not) be applied.
+ //
+ // In order to make the pre-commit timer working across failovers, though, we need
+ // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
+ // restart the timer.
+ pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
+ break;
+ case COMMIT_PENDING:
+ LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
+ currentTx.cohort.getIdentifier());
+ currentTx.lastAccess = now;
+ processNext = false;
+ return;
+ case ABORTED:
+ case COMMITTED:
+ case FAILED:
+ case READY:
+ default:
+ pendingTransactions.poll();
+ }
+
+ if (processNext) {
+ processNextTransaction();
+ }
+ }
+ }
+
+ void startAbort(final SimpleShardDataTreeCohort cohort) {
+ final Iterator<CommitEntry> it = pendingTransactions.iterator();
+ if (!it.hasNext()) {
+ LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
+ return;
+ }
+
+ // First entry is special, as it may already be committing
+ final CommitEntry first = it.next();
+ if (cohort.equals(first.cohort)) {
+ if (cohort.getState() != State.COMMIT_PENDING) {
+ LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(),
+ cohort.getIdentifier());
+ pendingTransactions.poll();
+ processNextTransaction();
+ } else {
+ LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+ }
+
+ return;
+ }
+
+ while (it.hasNext()) {
+ final CommitEntry e = it.next();
+ if (cohort.equals(e.cohort)) {
+ LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
+ it.remove();
+ return;
+ }
+ }
+
+ LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
+ }
+
+ void setRunOnPendingTransactionsComplete(final Runnable operation) {
+ runOnPendingTransactionsComplete = operation;
+ maybeRunOperationOnPendingTransactionsComplete();
+ }
+
+ private void maybeRunOperationOnPendingTransactionsComplete() {
+ if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
+ LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
+ runOnPendingTransactionsComplete);
+
+ runOnPendingTransactionsComplete.run();
+ runOnPendingTransactionsComplete = null;
+ }
+ }