import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
private final Shard shard;
private Runnable runOnPendingTransactionsComplete;
+ /**
+ * Optimistic {@link DataTreeCandidate} preparation. Since our DataTree implementation is a
+ * {@link TipProducingDataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another
+ * candidate can be prepared on top of it. They still need to be committed in sequence. Here we track the current
+ * tip of the data tree, which is the last DataTreeCandidate we have in flight, or the DataTree itself.
+ */
+ private TipProducingDataTreeTip tip;
+
private SchemaContext schemaContext;
public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
this.logContext = Preconditions.checkNotNull(logContext);
this.metadata = ImmutableList.copyOf(metadata);
+ tip = dataTree;
}
public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
+ final YangInstanceIdentifier root,
final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
- this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType),
+ this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType, root),
treeChangeListenerPublisher, dataChangeListenerPublisher, logContext);
}
@VisibleForTesting
public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
- this(shard, schemaContext, treeType, new DefaultShardDataTreeChangeListenerPublisher(),
+ this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
+ new DefaultShardDataTreeChangeListenerPublisher(),
new DefaultShardDataChangeListenerPublisher(), "");
}
@VisibleForTesting
@Deprecated
public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException {
+ // Direct modification commit is a utility, which cannot be used while we have transactions in-flight
+ Preconditions.checkState(tip == dataTree, "Cannot modify data tree while transacgitons are pending");
+
modification.ready();
dataTree.validate(modification);
DataTreeCandidate candidate = dataTree.prepare(modification);
}
pendingTransactions.clear();
+ tip = dataTree;
return ret;
}
LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
Exception cause;
try {
- dataTree.validate(modification);
+ tip.validate(modification);
LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
cohort.successfulCanCommit();
entry.lastAccess = shard.ticker().read();
Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
final DataTreeCandidateTip candidate;
try {
- candidate = dataTree.prepare(cohort.getDataTreeModification());
+ candidate = tip.prepare(cohort.getDataTreeModification());
} catch (Exception e) {
failPreCommit(e);
return;
return;
}
+ // Set the tip of the data tree.
+ tip = Verify.verifyNotNull(candidate);
+
entry.lastAccess = shard.ticker().read();
cohort.successfulPreCommit(candidate);
}
return;
}
+ // All pending candidates have been committed, reset the tip to the data tree
+ if (tip == candidate) {
+ tip = dataTree;
+ }
+
shard.getShardMBean().incrementCommittedTransactionCount();
shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
return cohort;
}
- @SuppressFBWarnings(value = {"RV_RETURN_VALUE_IGNORED", "DB_DUPLICATE_SWITCH_CLAUSES"},
- justification = "See inline comments below.")
+ @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
final long now = shard.ticker().read();
boolean processNext = true;
switch (currentTx.cohort.getState()) {
case CAN_COMMIT_PENDING:
- pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException());
+ pendingTransactions.remove().cohort.failedCanCommit(new TimeoutException());
break;
case CAN_COMMIT_COMPLETE:
// The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
// whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
// in PRE_COMMIT_COMPLETE is changed.
- pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
+ pendingTransactions.remove().cohort.reportFailure(new TimeoutException());
break;
case PRE_COMMIT_PENDING:
- pendingTransactions.poll().cohort.failedPreCommit(new TimeoutException());
+ pendingTransactions.remove().cohort.failedPreCommit(new TimeoutException());
break;
case PRE_COMMIT_COMPLETE:
// FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
// In order to make the pre-commit timer working across failovers, though, we need
// a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
// restart the timer.
- pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
+ pendingTransactions.remove().cohort.reportFailure(new TimeoutException());
break;
case COMMIT_PENDING:
LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
case FAILED:
case READY:
default:
- // The suppression of the FindBugs "RV_RETURN_VALUE_IGNORED" warning pertains to this line. In
- // this case, we just want to drop the current entry that expired and thus ignore the return value.
- // In fact we really shouldn't hit this case but we handle all enums for completeness.
- pendingTransactions.poll();
+ pendingTransactions.remove();
}
if (processNext) {
}
}
- void startAbort(final SimpleShardDataTreeCohort cohort) {
+ boolean startAbort(final SimpleShardDataTreeCohort cohort) {
final Iterator<CommitEntry> it = pendingTransactions.iterator();
if (!it.hasNext()) {
LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
- return;
+ return true;
}
// First entry is special, as it may already be committing
final CommitEntry first = it.next();
if (cohort.equals(first.cohort)) {
if (cohort.getState() != State.COMMIT_PENDING) {
- LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(),
+ LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.getIdentifier(),
cohort.getIdentifier());
- pendingTransactions.remove();
+ it.remove();
+ rebasePreCommittedTransactions(it, dataTree);
processNextTransaction();
- } else {
- LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+ return true;
}
- return;
+ LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+ return false;
}
+ TipProducingDataTreeTip newTip = dataTree;
while (it.hasNext()) {
final CommitEntry e = it.next();
if (cohort.equals(e.cohort)) {
LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
it.remove();
- return;
+ rebasePreCommittedTransactions(it, newTip);
+ return true;
+ } else {
+ newTip = cohort.getCandidate();
}
}
LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
+ return true;
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void rebasePreCommittedTransactions(Iterator<CommitEntry> iter, TipProducingDataTreeTip newTip) {
+ tip = newTip;
+ while (iter.hasNext()) {
+ final SimpleShardDataTreeCohort cohort = iter.next().cohort;
+ if (cohort.getState() == State.CAN_COMMIT_COMPLETE) {
+ LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.getIdentifier());
+
+ try {
+ tip.validate(cohort.getDataTreeModification());
+ } catch (DataValidationFailedException | RuntimeException e) {
+ LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.getIdentifier(), e);
+ cohort.reportFailure(e);
+ }
+ } else if (cohort.getState() == State.PRE_COMMIT_COMPLETE) {
+ LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.getIdentifier());
+
+ try {
+ tip.validate(cohort.getDataTreeModification());
+ DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification());
+ cohort.userPreCommit(candidate);
+
+ cohort.setNewCandidate(candidate);
+ tip = candidate;
+ } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) {
+ LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
+ cohort.reportFailure(e);
+ }
+ }
+ }
}
void setRunOnPendingTransactionsComplete(final Runnable operation) {