import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
/**
* Internal shard state, similar to a DOMStore, but optimized for use in the actor system,
}
}
- private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
+ private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(FiniteDuration.create(5, TimeUnit.SECONDS));
private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
/**
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException {
+ private void applyRecoveryCandidate(final DataTreeCandidate candidate) {
final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification());
DataTreeCandidates.applyToModification(mod, candidate);
mod.ready();
* @throws IOException when the snapshot fails to deserialize
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException, DataValidationFailedException {
+ void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException {
if (payload instanceof CommitTransactionPayload) {
final Entry<TransactionIdentifier, DataTreeCandidate> e =
((CommitTransactionPayload) payload).getCandidate();
}
}
- private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign)
+ private void applyReplicatedCandidate(final TransactionIdentifier identifier, final DataTreeCandidate foreign)
throws DataValidationFailedException {
LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
final DataTreeCandidate candidate = dataTree.prepare(mod);
dataTree.commit(candidate);
+ allMetadataCommittedTransaction(identifier);
notifyListeners(candidate);
}
* pre-Boron state -- which limits the number of options here.
*/
if (payload instanceof CommitTransactionPayload) {
- final TransactionIdentifier txId;
if (identifier == null) {
final Entry<TransactionIdentifier, DataTreeCandidate> e =
((CommitTransactionPayload) payload).getCandidate();
- txId = e.getKey();
- applyReplicatedCandidate(txId, e.getValue());
+ applyReplicatedCandidate(e.getKey(), e.getValue());
} else {
Verify.verify(identifier instanceof TransactionIdentifier);
- txId = (TransactionIdentifier) identifier;
- payloadReplicationComplete(txId);
+ payloadReplicationComplete((TransactionIdentifier) identifier);
}
- allMetadataCommittedTransaction(txId);
} else if (payload instanceof AbortTransactionPayload) {
if (identifier != null) {
payloadReplicationComplete((AbortTransactionPayload) payload);
final CommitEntry current = pendingFinishCommits.peek();
if (current == null) {
LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+ allMetadataCommittedTransaction(txId);
return;
}
if (!current.cohort.getIdentifier().equals(txId)) {
LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
current.cohort.getIdentifier(), txId);
+ allMetadataCommittedTransaction(txId);
return;
}
if (chain == null) {
chain = new ShardDataTreeTransactionChain(historyId, this);
transactionChains.put(historyId, chain);
- replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback);
+ replicatePayload(historyId, CreateLocalHistoryPayload.create(
+ historyId, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
} else if (callback != null) {
callback.run();
}
}
chain.close();
- replicatePayload(id, CloseLocalHistoryPayload.create(id), callback);
+ replicatePayload(id, CloseLocalHistoryPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
/**
return;
}
- replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
+ replicatePayload(id, PurgeLocalHistoryPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
Optional<DataTreeCandidate> readCurrentData() {
void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
final TransactionIdentifier id = transaction.getIdentifier();
LOG.debug("{}: aborting transaction {}", logContext, id);
- replicatePayload(id, AbortTransactionPayload.create(id), callback);
+ replicatePayload(id, AbortTransactionPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
@Override
void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
LOG.debug("{}: purging transaction {}", logContext, id);
- replicatePayload(id, PurgeTransactionPayload.create(id), callback);
+ replicatePayload(id, PurgeTransactionPayload.create(
+ id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
// For debugging purposes, allow dumping of the modification. Coupled with the above
// precondition log, it should allow us to understand what went on.
- LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification,
- dataTree);
+ LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", logContext, cohort.getIdentifier(),
+ modification, dataTree);
cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e);
} catch (Exception e) {
LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
processNextPendingTransaction();
}
- private void insertEntry(Deque<CommitEntry> queue, CommitEntry entry, int atIndex) {
+ private static void insertEntry(final Deque<CommitEntry> queue, final CommitEntry entry, final int atIndex) {
if (atIndex == 0) {
queue.addFirst(entry);
return;
}
private Collection<String> extractPrecedingShardNames(
- java.util.Optional<SortedSet<String>> participatingShardNames) {
+ final java.util.Optional<SortedSet<String>> participatingShardNames) {
return participatingShardNames.map((Function<SortedSet<String>, Collection<String>>)
set -> set.headSet(shard.getShardName())).orElse(Collections.<String>emptyList());
}
return;
}
+ allMetadataCommittedTransaction(txId);
shard.getShardMBean().incrementCommittedTransactionCount();
shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
final TransactionIdentifier txId = cohort.getIdentifier();
final Payload payload;
try {
- payload = CommitTransactionPayload.create(txId, candidate);
+ payload = CommitTransactionPayload.create(txId, candidate,
+ shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity());
} catch (IOException e) {
LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
pendingCommits.poll().cohort.failedCommit(e);
ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
final java.util.Optional<SortedSet<String>> participatingShardNames) {
SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
- cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable),
+ cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf,
COMMIT_STEP_TIMEOUT), participatingShardNames);
pendingTransactions.add(new CommitEntry(cohort, readTime()));
return cohort;