+
+ return createReadyCohort(transaction.getIdentifier(), snapshot);
+ }
+
+ void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
+ LOG.debug("{}: purging transaction {}", logContext, id);
+ replicatePayload(id, PurgeTransactionPayload.create(id), callback);
+ }
+
+ public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
+ return dataTree.takeSnapshot().readNode(path);
+ }
+
+ DataTreeSnapshot takeSnapshot() {
+ return dataTree.takeSnapshot();
+ }
+
+ @VisibleForTesting
+ public DataTreeModification newModification() {
+ return dataTree.takeSnapshot().newModification();
+ }
+
+ /**
+ * Commits a modification.
+ *
+ * @deprecated This method violates DataTree containment and will be removed.
+ */
+ @VisibleForTesting
+ @Deprecated
+ public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException {
+ // Direct modification commit is a utility, which cannot be used while we have transactions in-flight
+ Preconditions.checkState(tip == dataTree, "Cannot modify data tree while transacgitons are pending");
+
+ modification.ready();
+ dataTree.validate(modification);
+ DataTreeCandidate candidate = dataTree.prepare(modification);
+ dataTree.commit(candidate);
+ return candidate;
+ }
+
+ public Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
+ Collection<ShardDataTreeCohort> ret = new ArrayList<>(getQueueSize());
+
+ for (CommitEntry entry: pendingFinishCommits) {
+ ret.add(entry.cohort);
+ }
+
+ for (CommitEntry entry: pendingCommits) {
+ ret.add(entry.cohort);
+ }
+
+ for (CommitEntry entry: pendingTransactions) {
+ ret.add(entry.cohort);
+ }
+
+ pendingFinishCommits.clear();
+ pendingCommits.clear();
+ pendingTransactions.clear();
+ tip = dataTree;
+ return ret;
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void processNextPendingTransaction() {
+ processNextPending(pendingTransactions, State.CAN_COMMIT_PENDING, entry -> {
+ final SimpleShardDataTreeCohort cohort = entry.cohort;
+ final DataTreeModification modification = cohort.getDataTreeModification();
+
+ LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
+ Exception cause;
+ try {
+ cohort.throwCanCommitFailure();
+
+ tip.validate(modification);
+ LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
+ cohort.successfulCanCommit();
+ entry.lastAccess = ticker().read();
+ return;
+ } catch (ConflictingModificationAppliedException e) {
+ LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(),
+ e.getPath());
+ cause = new OptimisticLockFailedException("Optimistic lock failed.", e);
+ } catch (DataValidationFailedException e) {
+ LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.getIdentifier(),
+ e.getPath(), e);
+
+ // For debugging purposes, allow dumping of the modification. Coupled with the above
+ // precondition log, it should allow us to understand what went on.
+ LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification,
+ dataTree);
+ cause = new TransactionCommitFailedException("Data did not pass validation.", e);
+ } catch (Exception e) {
+ LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
+ cause = e;
+ }
+
+ // Failure path: propagate the failure, remove the transaction from the queue and loop to the next one
+ pendingTransactions.poll().cohort.failedCanCommit(cause);
+ });
+ }
+
+ private void processNextPending() {
+ processNextPendingCommit();
+ processNextPendingTransaction();
+ }
+
+ private void processNextPending(final Queue<CommitEntry> queue, final State allowedState,
+ final Consumer<CommitEntry> processor) {
+ while (!queue.isEmpty()) {
+ final CommitEntry entry = queue.peek();
+ final SimpleShardDataTreeCohort cohort = entry.cohort;
+
+ if (cohort.isFailed()) {
+ LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier());
+ queue.remove();
+ continue;
+ }
+
+ if (cohort.getState() == allowedState) {
+ processor.accept(entry);
+ }
+
+ break;
+ }
+
+ maybeRunOperationOnPendingTransactionsComplete();
+ }
+
+ private void processNextPendingCommit() {
+ processNextPending(pendingCommits, State.COMMIT_PENDING,
+ entry -> startCommit(entry.cohort, entry.cohort.getCandidate()));
+ }
+
+ private boolean peekNextPendingCommit() {
+ final CommitEntry first = pendingCommits.peek();
+ return first != null && first.cohort.getState() == State.COMMIT_PENDING;
+ }
+
+ void startCanCommit(final SimpleShardDataTreeCohort cohort) {
+ final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort;
+ if (!cohort.equals(current)) {
+ LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier());
+ return;
+ }
+
+ processNextPendingTransaction();
+ }
+
+ private void failPreCommit(final Exception cause) {
+ shard.getShardMBean().incrementFailedTransactionsCount();
+ pendingTransactions.poll().cohort.failedPreCommit(cause);
+ processNextPendingTransaction();
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ void startPreCommit(final SimpleShardDataTreeCohort cohort) {
+ final CommitEntry entry = pendingTransactions.peek();
+ Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
+
+ final SimpleShardDataTreeCohort current = entry.cohort;
+ Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
+
+ LOG.debug("{}: Preparing transaction {}", logContext, current.getIdentifier());
+
+ final DataTreeCandidateTip candidate;
+ try {
+ candidate = tip.prepare(cohort.getDataTreeModification());
+ cohort.userPreCommit(candidate);
+ } catch (ExecutionException | TimeoutException | RuntimeException e) {
+ failPreCommit(e);
+ return;
+ }
+
+ // Set the tip of the data tree.
+ tip = Verify.verifyNotNull(candidate);
+
+ entry.lastAccess = ticker().read();
+
+ pendingTransactions.remove();
+ pendingCommits.add(entry);
+
+ LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
+
+ cohort.successfulPreCommit(candidate);
+
+ processNextPendingTransaction();
+ }
+
+ private void failCommit(final Exception cause) {
+ shard.getShardMBean().incrementFailedTransactionsCount();
+ pendingFinishCommits.poll().cohort.failedCommit(cause);
+ processNextPending();
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void finishCommit(final SimpleShardDataTreeCohort cohort) {
+ final TransactionIdentifier txId = cohort.getIdentifier();
+ final DataTreeCandidate candidate = cohort.getCandidate();
+
+ LOG.debug("{}: Resuming commit of transaction {}", logContext, txId);
+
+ if (tip == candidate) {
+ // All pending candidates have been committed, reset the tip to the data tree.
+ tip = dataTree;
+ }
+
+ try {
+ dataTree.commit(candidate);
+ } catch (Exception e) {
+ LOG.error("{}: Failed to commit transaction {}", logContext, txId, e);
+ failCommit(e);
+ return;
+ }
+
+ shard.getShardMBean().incrementCommittedTransactionCount();
+ shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
+
+ // FIXME: propagate journal index
+ pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO);
+
+ LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
+ notifyListeners(candidate);
+
+ processNextPending();
+ }
+
+ void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
+ final CommitEntry entry = pendingCommits.peek();
+ Preconditions.checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
+
+ final SimpleShardDataTreeCohort current = entry.cohort;
+ if (!cohort.equals(current)) {
+ LOG.debug("{}: Transaction {} scheduled for commit step", logContext, cohort.getIdentifier());
+ return;
+ }
+
+ LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier());
+
+ final TransactionIdentifier txId = cohort.getIdentifier();
+ final Payload payload;
+ try {
+ payload = CommitTransactionPayload.create(txId, candidate);
+ } catch (IOException e) {
+ LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
+ pendingCommits.poll().cohort.failedCommit(e);
+ processNextPending();
+ return;
+ }
+
+ // We process next transactions pending canCommit before we call persistPayload to possibly progress subsequent
+ // transactions to the COMMIT_PENDING state so the payloads can be batched for replication. This is done for
+ // single-shard transactions that immediately transition from canCommit to preCommit to commit. Note that
+ // if the next pending transaction is progressed to COMMIT_PENDING and this method (startCommit) is called,
+ // the next transaction will not attempt to replicate b/c the current transaction is still at the head of the
+ // pendingCommits queue.
+ processNextPendingTransaction();
+
+ // After processing next pending transactions, we can now remove the current transaction from pendingCommits.
+ // Note this must be done before the call to peekNextPendingCommit below so we check the next transaction
+ // in order to properly determine the batchHint flag for the call to persistPayload.
+ pendingCommits.remove();
+ pendingFinishCommits.add(entry);
+
+ // See if the next transaction is pending commit (ie in the COMMIT_PENDING state) so it can be batched with
+ // this transaction for replication.
+ boolean replicationBatchHint = peekNextPendingCommit();
+
+ // Once completed, we will continue via payloadReplicationComplete
+ shard.persistPayload(txId, payload, replicationBatchHint);
+
+ entry.lastAccess = shard.ticker().read();
+
+ LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
+
+ // Process the next transaction pending commit, if any. If there is one it will be batched with this
+ // transaction for replication.
+ processNextPendingCommit();
+ }
+
+ Collection<ActorRef> getCohortActors() {
+ return cohortRegistry.getCohortActors();
+ }
+
+ void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
+ cohortRegistry.process(sender, message);
+ }
+
+ @Override
+ ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+ final Exception failure) {
+ SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.DeadOnArrival(this, mod, txId, failure);
+ pendingTransactions.add(new CommitEntry(cohort, ticker().read()));
+ return cohort;
+ }
+
+ @Override
+ ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+ SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.Normal(this, mod, txId,
+ cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
+ pendingTransactions.add(new CommitEntry(cohort, ticker().read()));
+ return cohort;
+ }
+
+ // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
+ // the newReadWriteTransaction()
+ ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+ if (txId.getHistoryId().getHistoryId() == 0) {
+ return createReadyCohort(txId, mod);
+ }
+
+ return ensureTransactionChain(txId.getHistoryId()).createReadyCohort(txId, mod);