import static com.google.common.base.Verify.verify;
import static com.google.common.base.Verify.verifyNotNull;
import static java.util.Objects.requireNonNull;
import static com.google.common.base.Verify.verify;
import static com.google.common.base.Verify.verifyNotNull;
import static java.util.Objects.requireNonNull;
import akka.actor.ActorRef;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import akka.actor.ActorRef;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
public class ShardDataTree extends ShardDataTreeTransactionParent {
private static final class CommitEntry {
final SimpleShardDataTreeCohort cohort;
public class ShardDataTree extends ShardDataTreeTransactionParent {
private static final class CommitEntry {
final SimpleShardDataTreeCohort cohort;
dataTree.setEffectiveModelContext(newSchemaContext);
this.schemaContext = newSchemaContext;
this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
}
dataTree.setEffectiveModelContext(newSchemaContext);
this.schemaContext = newSchemaContext;
this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
}
if (maybeNode.isPresent()) {
// Add everything from the remote node back
mod.write(YangInstanceIdentifier.empty(), maybeNode.get());
if (maybeNode.isPresent()) {
// Add everything from the remote node back
mod.write(YangInstanceIdentifier.empty(), maybeNode.get());
* @param snapshot Snapshot that needs to be applied
* @throws DataValidationFailedException when the snapshot fails to apply
*/
* @param snapshot Snapshot that needs to be applied
* @throws DataValidationFailedException when the snapshot fails to apply
*/
// TODO: we should be taking ShardSnapshotState here and performing forward-compatibility translation
applySnapshot(snapshot, UnaryOperator.identity());
}
// TODO: we should be taking ShardSnapshotState here and performing forward-compatibility translation
applySnapshot(snapshot, UnaryOperator.identity());
}
* @param snapshot Snapshot that needs to be applied
* @throws DataValidationFailedException when the snapshot fails to apply
*/
* @param snapshot Snapshot that needs to be applied
* @throws DataValidationFailedException when the snapshot fails to apply
*/
// TODO: we should be able to reuse the pruner, provided we are not reentrant
final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
dataSchemaContext);
// TODO: we should be able to reuse the pruner, provided we are not reentrant
final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
dataSchemaContext);
* @throws IOException when the snapshot fails to deserialize
* @throws DataValidationFailedException when the snapshot fails to apply
*/
* @throws IOException when the snapshot fails to deserialize
* @throws DataValidationFailedException when the snapshot fails to apply
*/
if (payload instanceof CommitTransactionPayload) {
applyRecoveryCandidate((CommitTransactionPayload) payload);
} else if (payload instanceof AbortTransactionPayload) {
if (payload instanceof CommitTransactionPayload) {
applyRecoveryCandidate((CommitTransactionPayload) payload);
} else if (payload instanceof AbortTransactionPayload) {
private void applyReplicatedCandidate(final CommitTransactionPayload payload)
throws DataValidationFailedException, IOException {
private void applyReplicatedCandidate(final CommitTransactionPayload payload)
throws DataValidationFailedException, IOException {
final TransactionIdentifier identifier = entry.getKey();
LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
final TransactionIdentifier identifier = entry.getKey();
LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
* @throws IOException when the snapshot fails to deserialize
* @throws DataValidationFailedException when the snapshot fails to apply
*/
* @throws IOException when the snapshot fails to deserialize
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException,
+ final void applyReplicatedPayload(final Identifier identifier, final Payload payload) throws IOException,
DataValidationFailedException {
/*
* This is a bit more involved than it needs to be due to to the fact we do not want to be touching the payload
DataValidationFailedException {
/*
* This is a bit more involved than it needs to be due to to the fact we do not want to be touching the payload
final DatastoreContext datastoreContext = shard.getDatastoreContext();
if (!datastoreContext.isSnapshotOnRootOverwrite()) {
return;
}
if (!datastoreContext.isPersistent()) {
final DatastoreContext datastoreContext = shard.getDatastoreContext();
if (!datastoreContext.isSnapshotOnRootOverwrite()) {
return;
}
if (!datastoreContext.isPersistent()) {
- if ((candidate.getRootPath().equals(YangInstanceIdentifier.empty())
- && candidate.getRootNode().getModificationType().equals(ModificationType.WRITE))) {
+ if (candidate.getRootPath().isEmpty()
+ && candidate.getRootNode().getModificationType() == ModificationType.WRITE) {
LOG.debug("{}: shard root overwritten, enqueuing snapshot", logContext);
shard.self().tell(new InitiateCaptureSnapshot(), noSender());
LOG.debug("{}: shard root overwritten, enqueuing snapshot", logContext);
shard.self().tell(new InitiateCaptureSnapshot(), noSender());
* @param closed True if the chain should be created in closed state (i.e. pending purge)
* @return Transaction chain handle
*/
* @param closed True if the chain should be created in closed state (i.e. pending purge)
* @return Transaction chain handle
*/
final boolean closed) {
final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this);
final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret);
final boolean closed) {
final ShardDataTreeTransactionChain ret = new ShardDataTreeTransactionChain(historyId, this);
final ShardDataTreeTransactionChain existing = transactionChains.putIfAbsent(historyId, ret);
final @Nullable Runnable callback) {
ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
if (chain == null) {
final @Nullable Runnable callback) {
ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
if (chain == null) {
shard.getShardMBean().incrementReadOnlyTransactionCount();
if (txId.getHistoryId().getHistoryId() == 0) {
shard.getShardMBean().incrementReadOnlyTransactionCount();
if (txId.getHistoryId().getHistoryId() == 0) {
shard.getShardMBean().incrementReadWriteTransactionCount();
if (txId.getHistoryId().getHistoryId() == 0) {
shard.getShardMBean().incrementReadWriteTransactionCount();
if (txId.getHistoryId().getHistoryId() == 0) {
* @param id History identifier
* @param callback Callback to invoke upon completion, may be null
*/
* @param id History identifier
* @param callback Callback to invoke upon completion, may be null
*/
if (commonCloseTransactionChain(id, callback)) {
replicatePayload(id, CloseLocalHistoryPayload.create(id,
shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
if (commonCloseTransactionChain(id, callback)) {
replicatePayload(id, CloseLocalHistoryPayload.create(id,
shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
* @param id History identifier
* @param callback Callback to invoke upon completion, may be null
*/
* @param id History identifier
* @param callback Callback to invoke upon completion, may be null
*/
final ShardDataTreeTransactionChain chain = transactionChains.remove(id);
if (chain == null) {
LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id);
final ShardDataTreeTransactionChain chain = transactionChains.remove(id);
if (chain == null) {
LOG.debug("{}: Purging non-existent transaction chain {}", logContext, id);
return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty())
.map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));
}
return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty())
.map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));
}
- public void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
+ final void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
final Optional<DataTreeCandidate> initialState,
final Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration);
}
final Optional<DataTreeCandidate> initialState,
final Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration);
}
final TransactionIdentifier id = transaction.getIdentifier();
LOG.debug("{}: aborting transaction {}", logContext, id);
replicatePayload(id, AbortTransactionPayload.create(
final TransactionIdentifier id = transaction.getIdentifier();
LOG.debug("{}: aborting transaction {}", logContext, id);
replicatePayload(id, AbortTransactionPayload.create(
final Optional<SortedSet<String>> participatingShardNames) {
final DataTreeModification snapshot = transaction.getSnapshot();
final TransactionIdentifier id = transaction.getIdentifier();
final Optional<SortedSet<String>> participatingShardNames) {
final DataTreeModification snapshot = transaction.getSnapshot();
final TransactionIdentifier id = transaction.getIdentifier();
LOG.debug("{}: purging transaction {}", logContext, id);
replicatePayload(id, PurgeTransactionPayload.create(
id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
LOG.debug("{}: purging transaction {}", logContext, id);
replicatePayload(id, PurgeTransactionPayload.create(
id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
Collection<ShardDataTreeCohort> ret = new ArrayList<>(getQueueSize());
for (CommitEntry entry: pendingFinishCommits) {
Collection<ShardDataTreeCohort> ret = new ArrayList<>(getQueueSize());
for (CommitEntry entry: pendingFinishCommits) {
void startCanCommit(final SimpleShardDataTreeCohort cohort) {
final CommitEntry head = pendingTransactions.peek();
if (head == null) {
void startCanCommit(final SimpleShardDataTreeCohort cohort) {
final CommitEntry head = pendingTransactions.peek();
if (head == null) {
@SuppressWarnings("checkstyle:IllegalCatch")
void startPreCommit(final SimpleShardDataTreeCohort cohort) {
final CommitEntry entry = pendingTransactions.peek();
@SuppressWarnings("checkstyle:IllegalCatch")
void startPreCommit(final SimpleShardDataTreeCohort cohort) {
final CommitEntry entry = pendingTransactions.peek();
void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
final CommitEntry entry = pendingCommits.peek();
checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
final CommitEntry entry = pendingCommits.peek();
checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
- ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+ final ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
final Exception failure) {
final SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, failure);
pendingTransactions.add(new CommitEntry(cohort, readTime()));
final Exception failure) {
final SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId, failure);
pendingTransactions.add(new CommitEntry(cohort, readTime()));
- ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+ final ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
final Optional<SortedSet<String>> participatingShardNames) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf,
final Optional<SortedSet<String>> participatingShardNames) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf,
// Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
// the newReadWriteTransaction()
// Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
// the newReadWriteTransaction()
- ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+ final ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
final Optional<SortedSet<String>> participatingShardNames) {
if (txId.getHistoryId().getHistoryId() == 0) {
return createReadyCohort(txId, mod, participatingShardNames);
final Optional<SortedSet<String>> participatingShardNames) {
if (txId.getHistoryId().getHistoryId() == 0) {
return createReadyCohort(txId, mod, participatingShardNames);
final Function<SimpleShardDataTreeCohort, OptionalLong> accessTimeUpdater) {
final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
final long now = readTime();
final Function<SimpleShardDataTreeCohort, OptionalLong> accessTimeUpdater) {
final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
final long now = readTime();
boolean startAbort(final SimpleShardDataTreeCohort cohort) {
final Iterator<CommitEntry> it = Iterables.concat(pendingFinishCommits, pendingCommits,
pendingTransactions).iterator();
boolean startAbort(final SimpleShardDataTreeCohort cohort) {
final Iterator<CommitEntry> it = Iterables.concat(pendingFinishCommits, pendingCommits,
pendingTransactions).iterator();
return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions),
e -> e.cohort).iterator();
}
return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions),
e -> e.cohort).iterator();
}
if (transactionChains.remove(id) != null) {
LOG.debug("{}: Removed transaction chain {}", logContext, id);
}
if (transactionChains.remove(id) != null) {
LOG.debug("{}: Removed transaction chain {}", logContext, id);
}