import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Queue;
import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload;
import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload;
import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.yangtools.concepts.Identifier;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@Override
public String toString() {
- return "CommitEntry [tx=" + cohort.getIdentifier() + ", state=" + cohort.getState() + "]";
+ return "CommitEntry [tx=" + cohort.transactionId() + ", state=" + cohort.getState() + "]";
}
}
@SuppressWarnings("checkstyle:IllegalCatch")
private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException {
- final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
- final DataTreeModification unwrapped = newModification();
- final PruningDataTreeModification mod = createPruningModification(unwrapped,
- NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().version()) > 0);
+ final var entry = payload.acquireCandidate();
+ final var unwrapped = newModification();
+ final var pruningMod = createPruningModification(unwrapped,
+ NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.streamVersion()) > 0);
- DataTreeCandidates.applyToModification(mod, entry.getValue().candidate());
- mod.ready();
+ DataTreeCandidates.applyToModification(pruningMod, entry.candidate());
+ pruningMod.ready();
LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped);
try {
dataTree.validate(unwrapped);
dataTree.commit(dataTree.prepare(unwrapped));
} catch (Exception e) {
- File file = new File(System.getProperty("karaf.data", "."),
+ final var file = new File(System.getProperty("karaf.data", "."),
"failed-recovery-payload-" + logContext + ".out");
DataTreeModificationOutput.toFile(file, unwrapped);
- throw new IllegalStateException(String.format(
- "%s: Failed to apply recovery payload. Modification data was written to file %s",
- logContext, file), e);
+ throw new IllegalStateException(
+ "%s: Failed to apply recovery payload. Modification data was written to file %s".formatted(
+ logContext, file),
+ e);
}
- allMetadataCommittedTransaction(entry.getKey());
+ allMetadataCommittedTransaction(entry.transactionId());
}
private PruningDataTreeModification createPruningModification(final DataTreeModification unwrapped,
final boolean uintAdapting) {
// TODO: we should be able to reuse the pruner, provided we are not reentrant
- final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
- dataSchemaContext);
+ final var pruner = ReusableNormalizedNodePruner.forDataSchemaContext(dataSchemaContext);
return uintAdapting ? new PruningDataTreeModification.Proactive(unwrapped, dataTree, pruner.withUintAdaption())
: new PruningDataTreeModification.Reactive(unwrapped, dataTree, pruner);
}
private void applyReplicatedCandidate(final CommitTransactionPayload payload)
throws DataValidationFailedException, IOException {
- final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
- final TransactionIdentifier identifier = entry.getKey();
- LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
+ final var payloadCandidate = payload.acquireCandidate();
+ final var transactionId = payloadCandidate.transactionId();
+ LOG.debug("{}: Applying foreign transaction {}", logContext, transactionId);
- final DataTreeModification mod = newModification();
+ final var mod = newModification();
// TODO: check version here, which will enable us to perform forward-compatibility transformations
- DataTreeCandidates.applyToModification(mod, entry.getValue().candidate());
+ DataTreeCandidates.applyToModification(mod, payloadCandidate.candidate());
mod.ready();
LOG.trace("{}: Applying foreign modification {}", logContext, mod);
dataTree.validate(mod);
- final DataTreeCandidate candidate = dataTree.prepare(mod);
+ final var candidate = dataTree.prepare(mod);
dataTree.commit(candidate);
- allMetadataCommittedTransaction(identifier);
+ allMetadataCommittedTransaction(transactionId);
notifyListeners(candidate);
}
}
// make sure acquireCandidate() is the last call touching the payload data as we want it to be GC-ed.
- checkRootOverwrite(((CommitTransactionPayload) payload).acquireCandidate().getValue().candidate());
+ checkRootOverwrite(commit.acquireCandidate().candidate());
} else if (payload instanceof AbortTransactionPayload abort) {
if (identifier != null) {
payloadReplicationComplete(abort);
}
private boolean payloadReplicationComplete(final TransactionIdentifier txId) {
- final CommitEntry current = pendingFinishCommits.peek();
+ final var current = pendingFinishCommits.peek();
if (current == null) {
LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
allMetadataCommittedTransaction(txId);
return false;
}
- if (!current.cohort.getIdentifier().equals(txId)) {
+ final var cohortTxId = current.cohort.transactionId();
+ if (!cohortTxId.equals(txId)) {
LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
- current.cohort.getIdentifier(), txId);
+ cohortTxId, txId);
allMetadataCommittedTransaction(txId);
return false;
}
}
final void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
- final Optional<DataTreeCandidate> initialState,
- final Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+ final Optional<DataTreeCandidate> initialState, final Consumer<Registration> onRegistration) {
treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration);
}
final SimpleShardDataTreeCohort cohort = entry.cohort;
final DataTreeModification modification = cohort.getDataTreeModification();
- LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Validating transaction {}", logContext, cohort.transactionId());
Exception cause;
try {
tip.validate(modification);
- LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Transaction {} validated", logContext, cohort.transactionId());
cohort.successfulCanCommit();
entry.lastAccess = readTime();
return;
} catch (ConflictingModificationAppliedException e) {
- LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(),
+ LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.transactionId(),
e.getPath());
cause = new OptimisticLockFailedException("Optimistic lock failed for path " + e.getPath(), e);
} catch (DataValidationFailedException e) {
- LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.getIdentifier(),
+ LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.transactionId(),
e.getPath(), e);
// For debugging purposes, allow dumping of the modification. Coupled with the above
// precondition log, it should allow us to understand what went on.
- LOG.debug("{}: Store Tx {}: modifications: {}", logContext, cohort.getIdentifier(), modification);
+ LOG.debug("{}: Store Tx {}: modifications: {}", logContext, cohort.transactionId(), modification);
LOG.trace("{}: Current tree: {}", logContext, dataTree);
cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e);
} catch (Exception e) {
final SimpleShardDataTreeCohort cohort = entry.cohort;
if (cohort.isFailed()) {
- LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Removing failed transaction {}", logContext, cohort.transactionId());
queue.remove();
continue;
}
Collection<String> precedingShardNames = extractPrecedingShardNames(cohort.getParticipatingShardNames());
if (precedingShardNames.isEmpty()) {
- LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.transactionId());
return;
}
LOG.debug("{}: Evaluating tx {} for canCommit - preceding participating shard names {}",
- logContext, cohort.getIdentifier(), precedingShardNames);
+ logContext, cohort.transactionId(), precedingShardNames);
final Iterator<CommitEntry> iter = pendingTransactions.iterator();
int index = -1;
int moveToIndex = -1;
if (cohort.equals(entry.cohort)) {
if (moveToIndex < 0) {
LOG.debug("{}: Not moving tx {} - cannot proceed with canCommit",
- logContext, cohort.getIdentifier());
+ logContext, cohort.transactionId());
return;
}
LOG.debug("{}: Moving {} to index {} in the pendingTransactions queue",
- logContext, cohort.getIdentifier(), moveToIndex);
+ logContext, cohort.transactionId(), moveToIndex);
iter.remove();
insertEntry(pendingTransactions, entry, moveToIndex);
if (!cohort.equals(pendingTransactions.peek().cohort)) {
LOG.debug("{}: Tx {} is not at the head of the queue - cannot proceed with canCommit",
- logContext, cohort.getIdentifier());
+ logContext, cohort.transactionId());
return;
}
LOG.debug("{}: Tx {} is now at the head of the queue - proceeding with canCommit",
- logContext, cohort.getIdentifier());
+ logContext, cohort.transactionId());
break;
}
if (entry.cohort.getState() != State.READY) {
LOG.debug("{}: Skipping pending transaction {} in state {}",
- logContext, entry.cohort.getIdentifier(), entry.cohort.getState());
+ logContext, entry.cohort.transactionId(), entry.cohort.getState());
continue;
}
if (precedingShardNames.equals(pendingPrecedingShardNames)) {
if (moveToIndex < 0) {
LOG.debug("{}: Preceding shard names {} for pending tx {} match - saving moveToIndex {}",
- logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), index);
+ logContext, pendingPrecedingShardNames, entry.cohort.transactionId(), index);
moveToIndex = index;
} else {
LOG.debug(
"{}: Preceding shard names {} for pending tx {} match but moveToIndex already set to {}",
- logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), moveToIndex);
+ logContext, pendingPrecedingShardNames, entry.cohort.transactionId(), moveToIndex);
}
} else {
LOG.debug("{}: Preceding shard names {} for pending tx {} differ - skipping",
- logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier());
+ logContext, pendingPrecedingShardNames, entry.cohort.transactionId());
}
}
}
final SimpleShardDataTreeCohort current = entry.cohort;
verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
- final TransactionIdentifier currentId = current.getIdentifier();
+ final TransactionIdentifier currentId = current.transactionId();
LOG.debug("{}: Preparing transaction {}", logContext, currentId);
final DataTreeCandidateTip candidate;
@SuppressWarnings("checkstyle:IllegalCatch")
private void finishCommit(final SimpleShardDataTreeCohort cohort) {
- final TransactionIdentifier txId = cohort.getIdentifier();
+ final TransactionIdentifier txId = cohort.transactionId();
final DataTreeCandidate candidate = cohort.getCandidate();
LOG.debug("{}: Resuming commit of transaction {}", logContext, txId);
final SimpleShardDataTreeCohort current = entry.cohort;
if (!cohort.equals(current)) {
- LOG.debug("{}: Transaction {} scheduled for commit step", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Transaction {} scheduled for commit step", logContext, cohort.transactionId());
return;
}
- LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier());
+ LOG.debug("{}: Starting commit for transaction {}", logContext, current.transactionId());
- final TransactionIdentifier txId = cohort.getIdentifier();
+ final TransactionIdentifier txId = cohort.transactionId();
final Payload payload;
try {
payload = CommitTransactionPayload.create(txId, candidate, PayloadVersion.current(),
final long newDelta = now - newAccess;
if (newDelta < delta) {
LOG.debug("{}: Updated current transaction {} access time", logContext,
- currentTx.cohort.getIdentifier());
+ currentTx.cohort.transactionId());
currentTx.lastAccess = newAccess;
delta = newDelta;
}
final State state = currentTx.cohort.getState();
LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
- currentTx.cohort.getIdentifier(), deltaMillis, state);
+ currentTx.cohort.transactionId(), deltaMillis, state);
boolean processNext = true;
final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after "
+ deltaMillis + "ms");
break;
case COMMIT_PENDING:
LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
- currentTx.cohort.getIdentifier());
+ currentTx.cohort.transactionId());
currentTx.lastAccess = now;
processNext = false;
return;
final Iterator<CommitEntry> it = Iterables.concat(pendingFinishCommits, pendingCommits,
pendingTransactions).iterator();
if (!it.hasNext()) {
- LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
+ LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.transactionId());
return true;
}
final CommitEntry first = it.next();
if (cohort.equals(first.cohort)) {
if (cohort.getState() != State.COMMIT_PENDING) {
- LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.getIdentifier(),
- cohort.getIdentifier());
+ LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.transactionId(),
+ cohort.transactionId());
it.remove();
if (cohort.getCandidate() != null) {
return true;
}
- LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+ LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.transactionId());
return false;
}
while (it.hasNext()) {
final CommitEntry e = it.next();
if (cohort.equals(e.cohort)) {
- LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
+ LOG.debug("{}: aborting queued transaction {}", logContext, cohort.transactionId());
it.remove();
if (cohort.getCandidate() != null) {
newTip = requireNonNullElse(e.cohort.getCandidate(), newTip);
}
- LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
+ LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.transactionId());
return true;
}
while (iter.hasNext()) {
final SimpleShardDataTreeCohort cohort = iter.next().cohort;
if (cohort.getState() == State.CAN_COMMIT_COMPLETE) {
- LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.transactionId());
try {
tip.validate(cohort.getDataTreeModification());
} catch (DataValidationFailedException | RuntimeException e) {
- LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.getIdentifier(), e);
+ LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.transactionId(), e);
cohort.reportFailure(e);
}
} else if (cohort.getState() == State.PRE_COMMIT_COMPLETE) {
- LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.transactionId());
try {
tip.validate(cohort.getDataTreeModification());
cohort.setNewCandidate(candidate);
tip = candidate;
} catch (RuntimeException | DataValidationFailedException e) {
- LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
+ LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.transactionId(), e);
cohort.reportFailure(e);
}
}