Now that our primary user is an implmentation detail, we can lock down a
few aspects of ShardDataTree operation. This will ease refactoring
further down the road.
Change-Id: I91248347889ea0ceecee9f093f54b5d8241fb39c
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
* <p>
* This class is not part of the API contract and is subject to change at any time. It is NOT thread-safe.
*/
* <p>
* This class is not part of the API contract and is subject to change at any time. It is NOT thread-safe.
*/
+@VisibleForTesting
+// non-final for mocking
public class ShardDataTree extends ShardDataTreeTransactionParent {
private static final class CommitEntry {
final SimpleShardDataTreeCohort cohort;
public class ShardDataTree extends ShardDataTreeTransactionParent {
private static final class CommitEntry {
final SimpleShardDataTreeCohort cohort;
return shard.ticker().read();
}
return shard.ticker().read();
}
- public DataTree getDataTree() {
+ final DataTree getDataTree() {
- SchemaContext getSchemaContext() {
+ @VisibleForTesting
+ final SchemaContext getSchemaContext() {
- void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) {
+ final void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) {
dataTree.setEffectiveModelContext(newSchemaContext);
this.schemaContext = newSchemaContext;
this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
}
dataTree.setEffectiveModelContext(newSchemaContext);
this.schemaContext = newSchemaContext;
this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
}
- void resetTransactionBatch() {
+ final void resetTransactionBatch() {
currentTransactionBatch = 0;
}
currentTransactionBatch = 0;
}
* @param snapshot Snapshot that needs to be applied
* @throws DataValidationFailedException when the snapshot fails to apply
*/
* @param snapshot Snapshot that needs to be applied
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ final void applySnapshot(final @NonNull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
// TODO: we should be taking ShardSnapshotState here and performing forward-compatibility translation
applySnapshot(snapshot, UnaryOperator.identity());
}
// TODO: we should be taking ShardSnapshotState here and performing forward-compatibility translation
applySnapshot(snapshot, UnaryOperator.identity());
}
* @param snapshot Snapshot that needs to be applied
* @throws DataValidationFailedException when the snapshot fails to apply
*/
* @param snapshot Snapshot that needs to be applied
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applyRecoverySnapshot(final @NonNull ShardSnapshotState snapshot) throws DataValidationFailedException {
+ final void applyRecoverySnapshot(final @NonNull ShardSnapshotState snapshot) throws DataValidationFailedException {
// TODO: we should be able to reuse the pruner, provided we are not reentrant
final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
dataSchemaContext);
// TODO: we should be able to reuse the pruner, provided we are not reentrant
final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
dataSchemaContext);
* @throws IOException when the snapshot fails to deserialize
* @throws DataValidationFailedException when the snapshot fails to apply
*/
* @throws IOException when the snapshot fails to deserialize
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applyRecoveryPayload(final @NonNull Payload payload) throws IOException {
+ final void applyRecoveryPayload(final @NonNull Payload payload) throws IOException {
if (payload instanceof CommitTransactionPayload) {
applyRecoveryCandidate((CommitTransactionPayload) payload);
} else if (payload instanceof AbortTransactionPayload) {
if (payload instanceof CommitTransactionPayload) {
applyRecoveryCandidate((CommitTransactionPayload) payload);
} else if (payload instanceof AbortTransactionPayload) {
* @throws IOException when the snapshot fails to deserialize
* @throws DataValidationFailedException when the snapshot fails to apply
*/
* @throws IOException when the snapshot fails to deserialize
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException,
+ final void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException,
DataValidationFailedException {
/*
* This is a bit more involved than it needs to be due to to the fact we do not want to be touching the payload
DataValidationFailedException {
/*
* This is a bit more involved than it needs to be due to to the fact we do not want to be touching the payload
* @param closed True if the chain should be created in closed state (i.e. pending purge)
* @return Transaction chain handle
*/
* @param closed True if the chain should be created in closed state (i.e. pending purge)
* @return Transaction chain handle
*/
- ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId,
+ final ShardDataTreeTransactionChain recreateTransactionChain(final LocalHistoryIdentifier historyId,
final boolean closed) {
final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this);
final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret);
final boolean closed) {
final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this);
final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret);
- ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId,
+ final ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId,
final @Nullable Runnable callback) {
ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
if (chain == null) {
final @Nullable Runnable callback) {
ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
if (chain == null) {
- ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
+ final ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
shard.getShardMBean().incrementReadOnlyTransactionCount();
if (txId.getHistoryId().getHistoryId() == 0) {
shard.getShardMBean().incrementReadOnlyTransactionCount();
if (txId.getHistoryId().getHistoryId() == 0) {
return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId);
}
return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId);
}
- ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
+ final ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
shard.getShardMBean().incrementReadWriteTransactionCount();
if (txId.getHistoryId().getHistoryId() == 0) {
shard.getShardMBean().incrementReadWriteTransactionCount();
if (txId.getHistoryId().getHistoryId() == 0) {
- public void notifyListeners(final DataTreeCandidate candidate) {
+ final void notifyListeners(final DataTreeCandidate candidate) {
treeChangeListenerPublisher.publishChanges(candidate);
}
treeChangeListenerPublisher.publishChanges(candidate);
}
* Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled
* replication callbacks.
*/
* Immediately purge all state relevant to leader. This includes all transaction chains and any scheduled
* replication callbacks.
*/
- void purgeLeaderState() {
+ final void purgeLeaderState() {
for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
chain.close();
}
for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
chain.close();
}
* @param id History identifier
* @param callback Callback to invoke upon completion, may be null
*/
* @param id History identifier
* @param callback Callback to invoke upon completion, may be null
*/
- void closeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
+ final void closeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
if (commonCloseTransactionChain(id, callback)) {
replicatePayload(id, CloseLocalHistoryPayload.create(id,
shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
if (commonCloseTransactionChain(id, callback)) {
replicatePayload(id, CloseLocalHistoryPayload.create(id,
shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
*
* @param id History identifier
*/
*
* @param id History identifier
*/
- void closeTransactionChain(final LocalHistoryIdentifier id) {
+ final void closeTransactionChain(final LocalHistoryIdentifier id) {
commonCloseTransactionChain(id, null);
}
commonCloseTransactionChain(id, null);
}
* @param id History identifier
* @param callback Callback to invoke upon completion, may be null
*/
* @param id History identifier
* @param callback Callback to invoke upon completion, may be null
*/
- void purgeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
+ final void purgeTransactionChain(final LocalHistoryIdentifier id, final @Nullable Runnable callback) {
final ShardDataTreeTransactionChain chain = transactionChains.remove(id);
if (chain == null) {
LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id);
final ShardDataTreeTransactionChain chain = transactionChains.remove(id);
if (chain == null) {
LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id);
id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
- Optional<DataTreeCandidate> readCurrentData() {
+ final Optional<DataTreeCandidate> readCurrentData() {
return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty())
.map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));
}
return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty())
.map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));
}
- public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
+ final void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
final Optional<DataTreeCandidate> initialState,
final Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration);
}
final Optional<DataTreeCandidate> initialState,
final Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration);
}
+ final int getQueueSize() {
return pendingTransactions.size() + pendingCommits.size() + pendingFinishCommits.size();
}
@Override
return pendingTransactions.size() + pendingCommits.size() + pendingFinishCommits.size();
}
@Override
- void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
+ final void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
final TransactionIdentifier id = transaction.getIdentifier();
LOG.debug("{}: aborting transaction {}", logContext, id);
replicatePayload(id, AbortTransactionPayload.create(
final TransactionIdentifier id = transaction.getIdentifier();
LOG.debug("{}: aborting transaction {}", logContext, id);
replicatePayload(id, AbortTransactionPayload.create(
- void abortFromTransactionActor(final AbstractShardDataTreeTransaction<?> transaction) {
+ final void abortFromTransactionActor(final AbstractShardDataTreeTransaction<?> transaction) {
// No-op for free-standing transactions
// No-op for free-standing transactions
- ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
+ final ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
final Optional<SortedSet<String>> participatingShardNames) {
final DataTreeModification snapshot = transaction.getSnapshot();
final TransactionIdentifier id = transaction.getIdentifier();
final Optional<SortedSet<String>> participatingShardNames) {
final DataTreeModification snapshot = transaction.getSnapshot();
final TransactionIdentifier id = transaction.getIdentifier();
return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames);
}
return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames);
}
- void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
+ final void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
LOG.debug("{}: purging transaction {}", logContext, id);
replicatePayload(id, PurgeTransactionPayload.create(
id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
LOG.debug("{}: purging transaction {}", logContext, id);
replicatePayload(id, PurgeTransactionPayload.create(
id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
- public Optional<NormalizedNode> readNode(final YangInstanceIdentifier path) {
+ @VisibleForTesting
+ public final Optional<NormalizedNode> readNode(final YangInstanceIdentifier path) {
return dataTree.takeSnapshot().readNode(path);
}
return dataTree.takeSnapshot().readNode(path);
}
- DataTreeSnapshot takeSnapshot() {
+ final DataTreeSnapshot takeSnapshot() {
return dataTree.takeSnapshot();
}
@VisibleForTesting
return dataTree.takeSnapshot();
}
@VisibleForTesting
- public DataTreeModification newModification() {
+ final DataTreeModification newModification() {
return dataTree.takeSnapshot().newModification();
}
return dataTree.takeSnapshot().newModification();
}
- public Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
+ final Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
Collection<ShardDataTreeCohort> ret = new ArrayList<>(getQueueSize());
for (CommitEntry entry: pendingFinishCommits) {
Collection<ShardDataTreeCohort> ret = new ArrayList<>(getQueueSize());
for (CommitEntry entry: pendingFinishCommits) {
/**
* Called some time after {@link #processNextPendingTransaction()} decides to stop processing.
*/
/**
* Called some time after {@link #processNextPendingTransaction()} decides to stop processing.
*/
- void resumeNextPendingTransaction() {
+ final void resumeNextPendingTransaction() {
LOG.debug("{}: attempting to resume transaction processing", logContext);
processNextPending();
}
LOG.debug("{}: attempting to resume transaction processing", logContext);
processNextPending();
}
return first != null && first.cohort.getState() == State.COMMIT_PENDING;
}
return first != null && first.cohort.getState() == State.COMMIT_PENDING;
}
+ // non-final for mocking
void startCanCommit(final SimpleShardDataTreeCohort cohort) {
final CommitEntry head = pendingTransactions.peek();
if (head == null) {
void startCanCommit(final SimpleShardDataTreeCohort cohort) {
final CommitEntry head = pendingTransactions.peek();
if (head == null) {
processNextPendingTransaction();
}
processNextPendingTransaction();
}
+ // non-final for mocking
@SuppressWarnings("checkstyle:IllegalCatch")
void startPreCommit(final SimpleShardDataTreeCohort cohort) {
final CommitEntry entry = pendingTransactions.peek();
@SuppressWarnings("checkstyle:IllegalCatch")
void startPreCommit(final SimpleShardDataTreeCohort cohort) {
final CommitEntry entry = pendingTransactions.peek();
+ // non-final for mocking
void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
final CommitEntry entry = pendingCommits.peek();
checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
final CommitEntry entry = pendingCommits.peek();
checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
processNextPendingCommit();
}
processNextPendingCommit();
}
- Collection<ActorRef> getCohortActors() {
+ final Collection<ActorRef> getCohortActors() {
return cohortRegistry.getCohortActors();
}
return cohortRegistry.getCohortActors();
}
- void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
+ final void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
cohortRegistry.process(sender, message);
}
@Override
cohortRegistry.process(sender, message);
}
@Override
- ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+ final ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
final Exception failure) {
final SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, failure);
pendingTransactions.add(new CommitEntry(cohort, readTime()));
final Exception failure) {
final SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, failure);
pendingTransactions.add(new CommitEntry(cohort, readTime()));
- ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+ final ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
final Optional<SortedSet<String>> participatingShardNames) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf,
final Optional<SortedSet<String>> participatingShardNames) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf,
// Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
// the newReadWriteTransaction()
// Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
// the newReadWriteTransaction()
- ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+ final ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
final Optional<SortedSet<String>> participatingShardNames) {
if (txId.getHistoryId().getHistoryId() == 0) {
return createReadyCohort(txId, mod, participatingShardNames);
final Optional<SortedSet<String>> participatingShardNames) {
if (txId.getHistoryId().getHistoryId() == 0) {
return createReadyCohort(txId, mod, participatingShardNames);
}
@SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
}
@SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
- void checkForExpiredTransactions(final long transactionCommitTimeoutMillis,
+ final void checkForExpiredTransactions(final long transactionCommitTimeoutMillis,
final Function<SimpleShardDataTreeCohort, OptionalLong> accessTimeUpdater) {
final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
final long now = readTime();
final Function<SimpleShardDataTreeCohort, OptionalLong> accessTimeUpdater) {
final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
final long now = readTime();
+ // non-final for mocking
boolean startAbort(final SimpleShardDataTreeCohort cohort) {
final Iterator<CommitEntry> it = Iterables.concat(pendingFinishCommits, pendingCommits,
pendingTransactions).iterator();
boolean startAbort(final SimpleShardDataTreeCohort cohort) {
final Iterator<CommitEntry> it = Iterables.concat(pendingFinishCommits, pendingCommits,
pendingTransactions).iterator();
- void setRunOnPendingTransactionsComplete(final Runnable operation) {
+ final void setRunOnPendingTransactionsComplete(final Runnable operation) {
runOnPendingTransactionsComplete = operation;
maybeRunOperationOnPendingTransactionsComplete();
}
runOnPendingTransactionsComplete = operation;
maybeRunOperationOnPendingTransactionsComplete();
}
- ShardStats getStats() {
+ final ShardStats getStats() {
return shard.getShardMBean();
}
return shard.getShardMBean();
}
- Iterator<SimpleShardDataTreeCohort> cohortIterator() {
+ final Iterator<SimpleShardDataTreeCohort> cohortIterator() {
return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions),
e -> e.cohort).iterator();
}
return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions),
e -> e.cohort).iterator();
}
- void removeTransactionChain(final LocalHistoryIdentifier id) {
+ final void removeTransactionChain(final LocalHistoryIdentifier id) {
if (transactionChains.remove(id) != null) {
LOG.debug("{}: Removed transaction chain {}", logContext, id);
}
if (transactionChains.remove(id) != null) {
LOG.debug("{}: Removed transaction chain {}", logContext, id);
}