}
@SuppressWarnings("checkstyle:IllegalCatch")
- private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException {
+ private void applyRecoveryCandidate(final DataTreeCandidate candidate) {
final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification());
DataTreeCandidates.applyToModification(mod, candidate);
mod.ready();
* @throws IOException when the snapshot fails to deserialize
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException, DataValidationFailedException {
+ void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException {
if (payload instanceof CommitTransactionPayload) {
final Entry<TransactionIdentifier, DataTreeCandidate> e =
((CommitTransactionPayload) payload).getCandidate();
}
}
- private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign)
+ private void applyReplicatedCandidate(final TransactionIdentifier identifier, final DataTreeCandidate foreign)
throws DataValidationFailedException {
LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
final DataTreeCandidate candidate = dataTree.prepare(mod);
dataTree.commit(candidate);
+ allMetadataCommittedTransaction(identifier);
notifyListeners(candidate);
}
* pre-Boron state -- which limits the number of options here.
*/
if (payload instanceof CommitTransactionPayload) {
- final TransactionIdentifier txId;
if (identifier == null) {
final Entry<TransactionIdentifier, DataTreeCandidate> e =
((CommitTransactionPayload) payload).getCandidate();
- txId = e.getKey();
- applyReplicatedCandidate(txId, e.getValue());
+ applyReplicatedCandidate(e.getKey(), e.getValue());
} else {
Verify.verify(identifier instanceof TransactionIdentifier);
- txId = (TransactionIdentifier) identifier;
- payloadReplicationComplete(txId);
+ payloadReplicationComplete((TransactionIdentifier) identifier);
}
- allMetadataCommittedTransaction(txId);
} else if (payload instanceof AbortTransactionPayload) {
if (identifier != null) {
payloadReplicationComplete((AbortTransactionPayload) payload);
final CommitEntry current = pendingFinishCommits.peek();
if (current == null) {
LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+ allMetadataCommittedTransaction(txId);
return;
}
if (!current.cohort.getIdentifier().equals(txId)) {
LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
current.cohort.getIdentifier(), txId);
+ allMetadataCommittedTransaction(txId);
return;
}
if (chain == null) {
chain = new ShardDataTreeTransactionChain(historyId, this);
transactionChains.put(historyId, chain);
- replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback);
+ replicatePayload(historyId, CreateLocalHistoryPayload.create(
+ historyId, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
} else if (callback != null) {
callback.run();
}
}
chain.close();
- replicatePayload(id, CloseLocalHistoryPayload.create(id), callback);
+ replicatePayload(id, CloseLocalHistoryPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
/**
return;
}
- replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
+ replicatePayload(id, PurgeLocalHistoryPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
Optional<DataTreeCandidate> readCurrentData() {
void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
final TransactionIdentifier id = transaction.getIdentifier();
LOG.debug("{}: aborting transaction {}", logContext, id);
- replicatePayload(id, AbortTransactionPayload.create(id), callback);
+ replicatePayload(id, AbortTransactionPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
@Override
void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
LOG.debug("{}: purging transaction {}", logContext, id);
- replicatePayload(id, PurgeTransactionPayload.create(id), callback);
+ replicatePayload(id, PurgeTransactionPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
return;
}
+ allMetadataCommittedTransaction(txId);
shard.getShardMBean().incrementCommittedTransactionCount();
shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
final TransactionIdentifier txId = cohort.getIdentifier();
final Payload payload;
try {
- payload = CommitTransactionPayload.create(txId, candidate);
+ payload = CommitTransactionPayload.create(txId, candidate,
+ shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity());
} catch (IOException e) {
LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
pendingCommits.poll().cohort.failedCommit(e);
ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
final java.util.Optional<SortedSet<String>> participatingShardNames) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
- cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable),
+ cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf,
COMMIT_STEP_TIMEOUT), participatingShardNames);
pendingTransactions.add(new CommitEntry(cohort, readTime()));
return cohort;