import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.primitives.UnsignedLong;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
import java.io.IOException;
import java.util.AbstractMap.SimpleEntry;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
* e.g. it does not expose public interfaces and assumes it is only ever called from a
* single thread.
*
+ * <p>
* This class is not part of the API contract and is subject to change at any time.
*/
@NotThreadSafe
private final Shard shard;
private Runnable runOnPendingTransactionsComplete;
+ /**
+ * Optimistic {@link DataTreeCandidate} preparation. Since our DataTree implementation is a
+ * {@link TipProducingDataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another
+ * candidate can be prepared on top of it. They still need to be committed in sequence. Here we track the current
+ * tip of the data tree, which is the last DataTreeCandidate we have in flight, or the DataTree itself.
+ */
+ private TipProducingDataTreeTip tip;
+
private SchemaContext schemaContext;
public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
this.logContext = Preconditions.checkNotNull(logContext);
this.metadata = ImmutableList.copyOf(metadata);
+ tip = dataTree;
}
public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
+ final YangInstanceIdentifier root,
final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
- this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType),
+ this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType, root),
treeChangeListenerPublisher, dataChangeListenerPublisher, logContext);
}
@VisibleForTesting
public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
- this(shard, schemaContext, treeType, new DefaultShardDataTreeChangeListenerPublisher(),
+ this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
+ new DefaultShardDataTreeChangeListenerPublisher(),
new DefaultShardDataChangeListenerPublisher(), "");
}
- String logContext() {
+ final String logContext() {
return logContext;
}
+ final Ticker ticker() {
+ return shard.ticker();
+ }
+
public TipProducingDataTree getDataTree() {
return dataTree;
}
return schemaContext;
}
- void updateSchemaContext(final SchemaContext schemaContext) {
- dataTree.setSchemaContext(schemaContext);
- this.schemaContext = Preconditions.checkNotNull(schemaContext);
+ void updateSchemaContext(final SchemaContext newSchemaContext) {
+ dataTree.setSchemaContext(newSchemaContext);
+ this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
}
/**
return new MetadataShardDataTreeSnapshot(rootNode, metaBuilder.build());
}
- private void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot,
+ private void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot,
final UnaryOperator<DataTreeModification> wrapper) throws DataValidationFailedException {
final Stopwatch elapsed = Stopwatch.createStarted();
final DataTreeModification unwrapped = unwrap(mod);
dataTree.validate(unwrapped);
- dataTree.commit(dataTree.prepare(unwrapped));
+ DataTreeCandidateTip candidate = dataTree.prepare(unwrapped);
+ dataTree.commit(candidate);
+ notifyListeners(candidate);
+
LOG.debug("{}: state snapshot applied in %s", logContext, elapsed);
}
+ /**
+ * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and
+ * does not perform any pruning.
+ *
+ * @param snapshot Snapshot that needs to be applied
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ applySnapshot(snapshot, UnaryOperator.identity());
+ }
+
private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) {
return new PruningDataTreeModification(delegate, dataTree, schemaContext);
}
applySnapshot(snapshot, this::wrapWithPruning);
}
-
- /**
- * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and
- * does not perform any pruning.
- *
- * @param snapshot Snapshot that needs to be applied
- * @throws DataValidationFailedException when the snapshot fails to apply
- */
- void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
- applySnapshot(snapshot, UnaryOperator.identity());
- }
-
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException {
final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification());
DataTreeCandidates.applyToModification(mod, candidate);
*/
void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException {
if (payload instanceof CommitTransactionPayload) {
- final Entry<TransactionIdentifier, DataTreeCandidate> e = ((CommitTransactionPayload) payload).getCandidate();
+ final Entry<TransactionIdentifier, DataTreeCandidate> e =
+ ((CommitTransactionPayload) payload).getCandidate();
applyRecoveryCandidate(e.getValue());
allMetadataCommittedTransaction(e.getKey());
} else if (payload instanceof DataTreeCandidatePayload) {
*/
if (payload instanceof CommitTransactionPayload) {
if (identifier == null) {
- final Entry<TransactionIdentifier, DataTreeCandidate> e = ((CommitTransactionPayload) payload).getCandidate();
+ final Entry<TransactionIdentifier, DataTreeCandidate> e =
+ ((CommitTransactionPayload) payload).getCandidate();
applyReplicatedCandidate(e.getKey(), e.getValue());
allMetadataCommittedTransaction(e.getKey());
} else {
}
}
- private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
+ ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier);
if (chain == null) {
chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this);
Optional<DataTreeCandidate>> registerChangeListener(final YangInstanceIdentifier path,
final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
final DataChangeScope scope) {
- final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
+ DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope);
return new SimpleEntry<>(reg, readCurrentData());
}
private Optional<DataTreeCandidate> readCurrentData() {
- final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
+ final Optional<NormalizedNode<?, ?>> currentState =
+ dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.<DataTreeCandidate>absent();
}
- public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> registerTreeChangeListener(
- final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
- final ListenerRegistration<DOMDataTreeChangeListener> reg = treeChangeListenerPublisher.registerTreeChangeListener(
- path, listener);
+ public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>>
+ registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
+ final ListenerRegistration<DOMDataTreeChangeListener> reg =
+ treeChangeListenerPublisher.registerTreeChangeListener(path, listener);
return new SimpleEntry<>(reg, readCurrentData());
}
final DataTreeModification snapshot = transaction.getSnapshot();
snapshot.ready();
- return createReadyCohort(transaction.getId(), snapshot);
+ return createReadyCohort(transaction.getIdentifier(), snapshot);
}
public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
}
/**
+ * Commits a modification.
+ *
* @deprecated This method violates DataTree containment and will be removed.
*/
@VisibleForTesting
@Deprecated
public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException {
+ // Direct modification commit is a utility, which cannot be used while we have transactions in-flight
+ Preconditions.checkState(tip == dataTree, "Cannot modify data tree while transacgitons are pending");
+
modification.ready();
dataTree.validate(modification);
DataTreeCandidate candidate = dataTree.prepare(modification);
public Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
Collection<ShardDataTreeCohort> ret = new ArrayList<>(pendingTransactions.size());
- for(CommitEntry entry: pendingTransactions) {
+ for (CommitEntry entry: pendingTransactions) {
ret.add(entry.cohort);
}
pendingTransactions.clear();
+ tip = dataTree;
return ret;
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void processNextTransaction() {
while (!pendingTransactions.isEmpty()) {
final CommitEntry entry = pendingTransactions.peek();
final SimpleShardDataTreeCohort cohort = entry.cohort;
final DataTreeModification modification = cohort.getDataTreeModification();
- if(cohort.getState() != State.CAN_COMMIT_PENDING) {
+ if (cohort.getState() != State.CAN_COMMIT_PENDING) {
break;
}
LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
Exception cause;
try {
- dataTree.validate(modification);
+ tip.validate(modification);
LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
cohort.successfulCanCommit();
entry.lastAccess = shard.ticker().read();
// For debugging purposes, allow dumping of the modification. Coupled with the above
// precondition log, it should allow us to understand what went on.
- LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, dataTree);
+ LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification,
+ dataTree);
cause = new TransactionCommitFailedException("Data did not pass validation.", e);
} catch (Exception e) {
LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
processNextTransaction();
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
void startPreCommit(final SimpleShardDataTreeCohort cohort) {
final CommitEntry entry = pendingTransactions.peek();
Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
final DataTreeCandidateTip candidate;
try {
- candidate = dataTree.prepare(cohort.getDataTreeModification());
+ candidate = tip.prepare(cohort.getDataTreeModification());
} catch (Exception e) {
failPreCommit(e);
return;
return;
}
+ // Set the tip of the data tree.
+ tip = Verify.verifyNotNull(candidate);
+
entry.lastAccess = shard.ticker().read();
cohort.successfulPreCommit(candidate);
}
processNextTransaction();
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void finishCommit(final SimpleShardDataTreeCohort cohort) {
final TransactionIdentifier txId = cohort.getIdentifier();
final DataTreeCandidate candidate = cohort.getCandidate();
return;
}
+ // All pending candidates have been committed, reset the tip to the data tree
+ if (tip == candidate) {
+ tip = dataTree;
+ }
+
shard.getShardMBean().incrementCommittedTransactionCount();
shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
cohortRegistry.process(sender, message);
}
+ @Override
ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
final DataTreeModification modification) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
return cohort;
}
+ @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
final long now = shard.ticker().read();
boolean processNext = true;
switch (currentTx.cohort.getState()) {
case CAN_COMMIT_PENDING:
- pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException());
+ pendingTransactions.remove().cohort.failedCanCommit(new TimeoutException());
break;
case CAN_COMMIT_COMPLETE:
- pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
+ // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
+ // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
+ // in PRE_COMMIT_COMPLETE is changed.
+ pendingTransactions.remove().cohort.reportFailure(new TimeoutException());
break;
case PRE_COMMIT_PENDING:
- pendingTransactions.poll().cohort.failedPreCommit(new TimeoutException());
+ pendingTransactions.remove().cohort.failedPreCommit(new TimeoutException());
break;
case PRE_COMMIT_COMPLETE:
// FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
// In order to make the pre-commit timer working across failovers, though, we need
// a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
// restart the timer.
- pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
+ pendingTransactions.remove().cohort.reportFailure(new TimeoutException());
break;
case COMMIT_PENDING:
LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
case FAILED:
case READY:
default:
- pendingTransactions.poll();
+ pendingTransactions.remove();
}
if (processNext) {
}
}
- void startAbort(final SimpleShardDataTreeCohort cohort) {
+ boolean startAbort(final SimpleShardDataTreeCohort cohort) {
final Iterator<CommitEntry> it = pendingTransactions.iterator();
if (!it.hasNext()) {
LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
- return;
+ return true;
}
// First entry is special, as it may already be committing
final CommitEntry first = it.next();
if (cohort.equals(first.cohort)) {
if (cohort.getState() != State.COMMIT_PENDING) {
- LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(),
+ LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.getIdentifier(),
cohort.getIdentifier());
- pendingTransactions.poll();
+
+ it.remove();
+ rebasePreCommittedTransactions(it, dataTree);
processNextTransaction();
- } else {
- LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+ return true;
}
- return;
+ LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+ return false;
}
+ TipProducingDataTreeTip newTip = dataTree;
while (it.hasNext()) {
final CommitEntry e = it.next();
if (cohort.equals(e.cohort)) {
LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
it.remove();
- return;
+ rebasePreCommittedTransactions(it, newTip);
+ return true;
+ } else {
+ newTip = cohort.getCandidate();
}
}
LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
+ return true;
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void rebasePreCommittedTransactions(Iterator<CommitEntry> iter, TipProducingDataTreeTip newTip) {
+ tip = newTip;
+ while (iter.hasNext()) {
+ final SimpleShardDataTreeCohort cohort = iter.next().cohort;
+ if (cohort.getState() == State.CAN_COMMIT_COMPLETE) {
+ LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.getIdentifier());
+
+ try {
+ tip.validate(cohort.getDataTreeModification());
+ } catch (DataValidationFailedException | RuntimeException e) {
+ LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.getIdentifier(), e);
+ cohort.reportFailure(e);
+ }
+ } else if (cohort.getState() == State.PRE_COMMIT_COMPLETE) {
+ LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.getIdentifier());
+
+ try {
+ tip.validate(cohort.getDataTreeModification());
+ DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification());
+ cohort.userPreCommit(candidate);
+
+ cohort.setNewCandidate(candidate);
+ tip = candidate;
+ } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) {
+ LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
+ cohort.reportFailure(e);
+ }
+ }
+ }
}
void setRunOnPendingTransactionsComplete(final Runnable operation) {
}
private void maybeRunOperationOnPendingTransactionsComplete() {
- if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
- LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
- runOnPendingTransactionsComplete);
-
- runOnPendingTransactionsComplete.run();
- runOnPendingTransactionsComplete = null;
- }
- }
+ if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
+ LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
+ runOnPendingTransactionsComplete);
+
+ runOnPendingTransactionsComplete.run();
+ runOnPendingTransactionsComplete = null;
+ }
+ }
}