import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.Iterables;
import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.UnaryOperator;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
private final Collection<ShardDataTreeMetadata<?>> metadata;
- private final TipProducingDataTree dataTree;
+ private final DataTree dataTree;
private final String logContext;
private final Shard shard;
private Runnable runOnPendingTransactionsComplete;
/**
* Optimistic {@link DataTreeCandidate} preparation. Since our DataTree implementation is a
- * {@link TipProducingDataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another
+ * {@link DataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another
* candidate can be prepared on top of it. They still need to be committed in sequence. Here we track the current
* tip of the data tree, which is the last DataTreeCandidate we have in flight, or the DataTree itself.
*/
- private TipProducingDataTreeTip tip;
+ private DataTreeTip tip;
private SchemaContext schemaContext;
private int currentTransactionBatch;
- ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
+ ShardDataTree(final Shard shard, final SchemaContext schemaContext, final DataTree dataTree,
final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
final ShardDataTreeMetadata<?>... metadata) {
final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
final ShardDataTreeMetadata<?>... metadata) {
- this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType, root),
- treeChangeListenerPublisher, dataChangeListenerPublisher, logContext, metadata);
+ this(shard, schemaContext, createDataTree(treeType, root), treeChangeListenerPublisher,
+ dataChangeListenerPublisher, logContext, metadata);
+ }
+
+ private static DataTree createDataTree(final TreeType treeType, final YangInstanceIdentifier root) {
+ final DataTreeConfiguration baseConfig = DataTreeConfiguration.getDefault(treeType);
+ return new InMemoryDataTreeFactory().create(new DataTreeConfiguration.Builder(baseConfig.getTreeType())
+ .setMandatoryNodesValidation(baseConfig.isMandatoryNodesValidationEnabled())
+ .setUniqueIndexes(baseConfig.isUniqueIndexEnabled())
+ .setRootPath(root)
+ .build());
}
@VisibleForTesting
return shard.ticker().read();
}
- public TipProducingDataTree getDataTree() {
+ public DataTree getDataTree() {
return dataTree;
}
* @param snapshot Snapshot that needs to be applied
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applyRecoverySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ void applyRecoverySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
applySnapshot(snapshot, this::wrapWithPruning);
}
* @throws IOException when the snapshot fails to deserialize
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException {
+ void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException, DataValidationFailedException {
if (payload instanceof CommitTransactionPayload) {
final Entry<TransactionIdentifier, DataTreeCandidate> e =
((CommitTransactionPayload) payload).getCandidate();
}
Optional<DataTreeCandidate> readCurrentData() {
- final Optional<NormalizedNode<?, ?>> currentState =
+ final java.util.Optional<NormalizedNode<?, ?>> currentState =
dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.<DataTreeCandidate>absent();
}
public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
- return dataTree.takeSnapshot().readNode(path);
+ return Optional.fromJavaUtil(dataTree.takeSnapshot().readNode(path));
}
DataTreeSnapshot takeSnapshot() {
LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
Exception cause;
try {
- cohort.throwCanCommitFailure();
-
tip.validate(modification);
LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
cohort.successfulCanCommit();
} catch (ConflictingModificationAppliedException e) {
LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(),
e.getPath());
- cause = new OptimisticLockFailedException("Optimistic lock failed.", e);
+ cause = new OptimisticLockFailedException("Optimistic lock failed for path " + e.getPath(), e);
} catch (DataValidationFailedException e) {
LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.getIdentifier(),
e.getPath(), e);
// precondition log, it should allow us to understand what went on.
LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification,
dataTree);
- cause = new TransactionCommitFailedException("Data did not pass validation.", e);
+ cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e);
} catch (Exception e) {
LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
cause = e;
processNextPendingTransaction();
}
- private void failPreCommit(final Exception cause) {
+ private void failPreCommit(final Throwable cause) {
shard.getShardMBean().incrementFailedTransactionsCount();
pendingTransactions.poll().cohort.failedPreCommit(cause);
processNextPendingTransaction();
final DataTreeCandidateTip candidate;
try {
candidate = tip.prepare(cohort.getDataTreeModification());
- cohort.userPreCommit(candidate);
- } catch (ExecutionException | TimeoutException | RuntimeException e) {
+ } catch (RuntimeException e) {
failPreCommit(e);
return;
}
- // Set the tip of the data tree.
- tip = Verify.verifyNotNull(candidate);
+ cohort.userPreCommit(candidate, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void noop) {
+ // Set the tip of the data tree.
+ tip = Verify.verifyNotNull(candidate);
- entry.lastAccess = readTime();
+ entry.lastAccess = readTime();
- pendingTransactions.remove();
- pendingCommits.add(entry);
+ pendingTransactions.remove();
+ pendingCommits.add(entry);
- LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
+ LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
- cohort.successfulPreCommit(candidate);
+ cohort.successfulPreCommit(candidate);
- processNextPendingTransaction();
+ processNextPendingTransaction();
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ failPreCommit(failure);
+ }
+ });
}
private void failCommit(final Exception cause) {
shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
// FIXME: propagate journal index
- pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO);
+ pendingFinishCommits.poll().cohort.successfulCommit(UnsignedLong.ZERO, () -> {
+ LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
+ notifyListeners(candidate);
- LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
- notifyListeners(candidate);
-
- processNextPending();
+ processNextPending();
+ });
}
void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
@Override
ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
final Exception failure) {
- SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.DeadOnArrival(this, mod, txId, failure);
+ final SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, failure);
pendingTransactions.add(new CommitEntry(cohort, readTime()));
return cohort;
}
@Override
- ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
- final DataTreeModification mod) {
- SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.Normal(this, mod, txId,
- cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
+ ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+ SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
+ cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable),
+ COMMIT_STEP_TIMEOUT));
pendingTransactions.add(new CommitEntry(cohort, readTime()));
return cohort;
}
}
@SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
- void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
+ void checkForExpiredTransactions(final long transactionCommitTimeoutMillis,
+ final Function<SimpleShardDataTreeCohort, Optional<Long>> accessTimeUpdater) {
final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
final long now = readTime();
final Queue<CommitEntry> currentQueue = !pendingFinishCommits.isEmpty() ? pendingFinishCommits :
!pendingCommits.isEmpty() ? pendingCommits : pendingTransactions;
final CommitEntry currentTx = currentQueue.peek();
- if (currentTx != null && currentTx.lastAccess + timeout < now) {
- LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
- currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState());
- boolean processNext = true;
- switch (currentTx.cohort.getState()) {
- case CAN_COMMIT_PENDING:
- currentQueue.remove().cohort.failedCanCommit(new TimeoutException());
- break;
- case CAN_COMMIT_COMPLETE:
- // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
- // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
- // in PRE_COMMIT_COMPLETE is changed.
- currentQueue.remove().cohort.reportFailure(new TimeoutException());
- break;
- case PRE_COMMIT_PENDING:
- currentQueue.remove().cohort.failedPreCommit(new TimeoutException());
- break;
- case PRE_COMMIT_COMPLETE:
- // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
- // are ready we should commit the transaction, not abort it. Our current software stack does
- // not allow us to do that consistently, because we persist at the time of commit, hence
- // we can end up in a state where we have pre-committed a transaction, then a leader failover
- // occurred ... the new leader does not see the pre-committed transaction and does not have
- // a running timer. To fix this we really need two persistence events.
- //
- // The first one, done at pre-commit time will hold the transaction payload. When consensus
- // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not
- // apply the state in this event.
- //
- // The second one, done at commit (or abort) time holds only the transaction identifier and
- // signals to followers that the state should (or should not) be applied.
- //
- // In order to make the pre-commit timer working across failovers, though, we need
- // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
- // restart the timer.
- currentQueue.remove().cohort.reportFailure(new TimeoutException());
- break;
- case COMMIT_PENDING:
- LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
- currentTx.cohort.getIdentifier());
- currentTx.lastAccess = now;
- processNext = false;
- return;
- case ABORTED:
- case COMMITTED:
- case FAILED:
- case READY:
- default:
- currentQueue.remove();
+ if (currentTx == null) {
+ // Empty queue, no-op
+ return;
+ }
+
+ long delta = now - currentTx.lastAccess;
+ if (delta < timeout) {
+ // Not expired yet, bail
+ return;
+ }
+
+ final Optional<Long> updateOpt = accessTimeUpdater.apply(currentTx.cohort);
+ if (updateOpt.isPresent()) {
+ final long newAccess = updateOpt.get().longValue();
+ final long newDelta = now - newAccess;
+ if (newDelta < delta) {
+ LOG.debug("{}: Updated current transaction {} access time", logContext,
+ currentTx.cohort.getIdentifier());
+ currentTx.lastAccess = newAccess;
+ delta = newDelta;
}
- if (processNext) {
- processNextPending();
+ if (delta < timeout) {
+ // Not expired yet, bail
+ return;
}
}
+
+ final long deltaMillis = TimeUnit.NANOSECONDS.toMillis(delta);
+ final State state = currentTx.cohort.getState();
+
+ LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
+ currentTx.cohort.getIdentifier(), deltaMillis, state);
+ boolean processNext = true;
+ final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after "
+ + deltaMillis + "ms");
+
+ switch (state) {
+ case CAN_COMMIT_PENDING:
+ currentQueue.remove().cohort.failedCanCommit(cohortFailure);
+ break;
+ case CAN_COMMIT_COMPLETE:
+ // The suppression of the FindBugs "DB_DUPLICATE_SWITCH_CLAUSES" warning pertains to this clause
+ // whose code is duplicated with PRE_COMMIT_COMPLETE. The clauses aren't combined in case the code
+ // in PRE_COMMIT_COMPLETE is changed.
+ currentQueue.remove().cohort.reportFailure(cohortFailure);
+ break;
+ case PRE_COMMIT_PENDING:
+ currentQueue.remove().cohort.failedPreCommit(cohortFailure);
+ break;
+ case PRE_COMMIT_COMPLETE:
+ // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
+ // are ready we should commit the transaction, not abort it. Our current software stack does
+ // not allow us to do that consistently, because we persist at the time of commit, hence
+ // we can end up in a state where we have pre-committed a transaction, then a leader failover
+ // occurred ... the new leader does not see the pre-committed transaction and does not have
+ // a running timer. To fix this we really need two persistence events.
+ //
+ // The first one, done at pre-commit time will hold the transaction payload. When consensus
+ // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not
+ // apply the state in this event.
+ //
+ // The second one, done at commit (or abort) time holds only the transaction identifier and
+ // signals to followers that the state should (or should not) be applied.
+ //
+ // In order to make the pre-commit timer working across failovers, though, we need
+ // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
+ // restart the timer.
+ currentQueue.remove().cohort.reportFailure(cohortFailure);
+ break;
+ case COMMIT_PENDING:
+ LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
+ currentTx.cohort.getIdentifier());
+ currentTx.lastAccess = now;
+ processNext = false;
+ return;
+ case READY:
+ currentQueue.remove().cohort.reportFailure(cohortFailure);
+ break;
+ case ABORTED:
+ case COMMITTED:
+ case FAILED:
+ default:
+ currentQueue.remove();
+ }
+
+ if (processNext) {
+ processNextPending();
+ }
}
boolean startAbort(final SimpleShardDataTreeCohort cohort) {
return false;
}
- TipProducingDataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree);
+ DataTreeTip newTip = MoreObjects.firstNonNull(first.cohort.getCandidate(), dataTree);
while (it.hasNext()) {
final CommitEntry e = it.next();
if (cohort.equals(e.cohort)) {
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private void rebaseTransactions(final Iterator<CommitEntry> iter, @Nonnull final TipProducingDataTreeTip newTip) {
+ private void rebaseTransactions(final Iterator<CommitEntry> iter, @Nonnull final DataTreeTip newTip) {
tip = Preconditions.checkNotNull(newTip);
while (iter.hasNext()) {
final SimpleShardDataTreeCohort cohort = iter.next().cohort;
try {
tip.validate(cohort.getDataTreeModification());
DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification());
- cohort.userPreCommit(candidate);
cohort.setNewCandidate(candidate);
tip = candidate;
- } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) {
+ } catch (RuntimeException | DataValidationFailedException e) {
LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
cohort.reportFailure(e);
}
ShardStats getStats() {
return shard.getShardMBean();
}
+
+ Iterator<SimpleShardDataTreeCohort> cohortIterator() {
+ return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions),
+ e -> e.cohort).iterator();
+ }
+
+ void removeTransactionChain(final LocalHistoryIdentifier id) {
+ if (transactionChains.remove(id) != null) {
+ LOG.debug("{}: Removed transaction chain {}", logContext, id);
+ }
+ }
}