import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Queue;
import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload;
import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload;
import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.yangtools.concepts.Identifier;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.tree.spi.DataTreeCandidates;
import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
@Override
public String toString() {
- return "CommitEntry [tx=" + cohort.getIdentifier() + ", state=" + cohort.getState() + "]";
+ return "CommitEntry [tx=" + cohort.transactionId() + ", state=" + cohort.getState() + "]";
}
}
*/
private DataTreeTip tip;
- private SchemaContext schemaContext;
+ private EffectiveModelContext schemaContext;
private DataSchemaContextTree dataSchemaContext;
private int currentTransactionBatch;
@VisibleForTesting
public ShardDataTree(final Shard shard, final EffectiveModelContext schemaContext, final TreeType treeType) {
- this(shard, schemaContext, treeType, YangInstanceIdentifier.empty(),
+ this(shard, schemaContext, treeType, YangInstanceIdentifier.of(),
new DefaultShardDataTreeChangeListenerPublisher(""), "");
}
}
@VisibleForTesting
- final SchemaContext getSchemaContext() {
+ final EffectiveModelContext getSchemaContext() {
return schemaContext;
}
* @return A state snapshot
*/
@NonNull ShardDataTreeSnapshot takeStateSnapshot() {
- final NormalizedNode rootNode = takeSnapshot().readNode(YangInstanceIdentifier.empty()).get();
+ final NormalizedNode rootNode = takeSnapshot().readNode(YangInstanceIdentifier.of()).orElseThrow();
final Builder<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metaBuilder =
ImmutableMap.builder();
}
final Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> snapshotMeta;
- if (snapshot instanceof MetadataShardDataTreeSnapshot) {
- snapshotMeta = ((MetadataShardDataTreeSnapshot) snapshot).getMetadata();
+ if (snapshot instanceof MetadataShardDataTreeSnapshot metaSnapshot) {
+ snapshotMeta = metaSnapshot.getMetadata();
} else {
snapshotMeta = ImmutableMap.of();
}
- for (ShardDataTreeMetadata<?> m : metadata) {
- final ShardDataTreeSnapshotMetadata<?> s = snapshotMeta.get(m.getSupportedType());
+ for (var m : metadata) {
+ final var s = snapshotMeta.get(m.getSupportedType());
if (s != null) {
m.applySnapshot(s);
} else {
final DataTreeModification unwrapped = newModification();
final DataTreeModification mod = wrapper.apply(unwrapped);
// delete everything first
- mod.delete(YangInstanceIdentifier.empty());
+ mod.delete(YangInstanceIdentifier.of());
- final Optional<NormalizedNode> maybeNode = snapshot.getRootNode();
- if (maybeNode.isPresent()) {
+ snapshot.getRootNode().ifPresent(rootNode -> {
// Add everything from the remote node back
- mod.write(YangInstanceIdentifier.empty(), maybeNode.get());
- }
+ mod.write(YangInstanceIdentifier.of(), rootNode);
+ });
+
mod.ready();
dataTree.validate(unwrapped);
@SuppressWarnings("checkstyle:IllegalCatch")
private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException {
- final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
- final DataTreeModification unwrapped = newModification();
- final PruningDataTreeModification mod = createPruningModification(unwrapped,
- NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().getVersion()) > 0);
+ final var entry = payload.acquireCandidate();
+ final var unwrapped = newModification();
+ final var pruningMod = createPruningModification(unwrapped,
+ NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.streamVersion()) > 0);
- DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate());
- mod.ready();
+ DataTreeCandidates.applyToModification(pruningMod, entry.candidate());
+ pruningMod.ready();
LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped);
try {
dataTree.validate(unwrapped);
dataTree.commit(dataTree.prepare(unwrapped));
} catch (Exception e) {
- File file = new File(System.getProperty("karaf.data", "."),
+ final var file = new File(System.getProperty("karaf.data", "."),
"failed-recovery-payload-" + logContext + ".out");
DataTreeModificationOutput.toFile(file, unwrapped);
- throw new IllegalStateException(String.format(
- "%s: Failed to apply recovery payload. Modification data was written to file %s",
- logContext, file), e);
+ throw new IllegalStateException(
+ "%s: Failed to apply recovery payload. Modification data was written to file %s".formatted(
+ logContext, file),
+ e);
}
- allMetadataCommittedTransaction(entry.getKey());
+ allMetadataCommittedTransaction(entry.transactionId());
}
private PruningDataTreeModification createPruningModification(final DataTreeModification unwrapped,
final boolean uintAdapting) {
// TODO: we should be able to reuse the pruner, provided we are not reentrant
- final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
- dataSchemaContext);
+ final var pruner = ReusableNormalizedNodePruner.forDataSchemaContext(dataSchemaContext);
return uintAdapting ? new PruningDataTreeModification.Proactive(unwrapped, dataTree, pruner.withUintAdaption())
: new PruningDataTreeModification.Reactive(unwrapped, dataTree, pruner);
}
* @throws DataValidationFailedException when the snapshot fails to apply
*/
final void applyRecoveryPayload(final @NonNull Payload payload) throws IOException {
- if (payload instanceof CommitTransactionPayload) {
- applyRecoveryCandidate((CommitTransactionPayload) payload);
- } else if (payload instanceof AbortTransactionPayload) {
- allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
- } else if (payload instanceof PurgeTransactionPayload) {
- allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
- } else if (payload instanceof CreateLocalHistoryPayload) {
- allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
- } else if (payload instanceof CloseLocalHistoryPayload) {
- allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
- } else if (payload instanceof PurgeLocalHistoryPayload) {
- allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
- } else if (payload instanceof SkipTransactionsPayload) {
- allMetadataSkipTransactions((SkipTransactionsPayload) payload);
+ if (payload instanceof CommitTransactionPayload commit) {
+ applyRecoveryCandidate(commit);
+ } else if (payload instanceof AbortTransactionPayload abort) {
+ allMetadataAbortedTransaction(abort.getIdentifier());
+ } else if (payload instanceof PurgeTransactionPayload purge) {
+ allMetadataPurgedTransaction(purge.getIdentifier());
+ } else if (payload instanceof CreateLocalHistoryPayload create) {
+ allMetadataCreatedLocalHistory(create.getIdentifier());
+ } else if (payload instanceof CloseLocalHistoryPayload close) {
+ allMetadataClosedLocalHistory(close.getIdentifier());
+ } else if (payload instanceof PurgeLocalHistoryPayload purge) {
+ allMetadataPurgedLocalHistory(purge.getIdentifier());
+ } else if (payload instanceof SkipTransactionsPayload skip) {
+ allMetadataSkipTransactions(skip);
} else {
LOG.debug("{}: ignoring unhandled payload {}", logContext, payload);
}
private void applyReplicatedCandidate(final CommitTransactionPayload payload)
throws DataValidationFailedException, IOException {
- final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
- final TransactionIdentifier identifier = entry.getKey();
- LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
+ final var payloadCandidate = payload.acquireCandidate();
+ final var transactionId = payloadCandidate.transactionId();
+ LOG.debug("{}: Applying foreign transaction {}", logContext, transactionId);
- final DataTreeModification mod = newModification();
+ final var mod = newModification();
// TODO: check version here, which will enable us to perform forward-compatibility transformations
- DataTreeCandidates.applyToModification(mod, entry.getValue().getCandidate());
+ DataTreeCandidates.applyToModification(mod, payloadCandidate.candidate());
mod.ready();
LOG.trace("{}: Applying foreign modification {}", logContext, mod);
dataTree.validate(mod);
- final DataTreeCandidate candidate = dataTree.prepare(mod);
+ final var candidate = dataTree.prepare(mod);
dataTree.commit(candidate);
- allMetadataCommittedTransaction(identifier);
+ allMetadataCommittedTransaction(transactionId);
notifyListeners(candidate);
}
* In any case, we know that this is an entry coming from replication, hence we can be sure we will not observe
* pre-Boron state -- which limits the number of options here.
*/
- if (payload instanceof CommitTransactionPayload) {
+ if (payload instanceof CommitTransactionPayload commit) {
if (identifier == null) {
- applyReplicatedCandidate((CommitTransactionPayload) payload);
+ applyReplicatedCandidate(commit);
} else {
verify(identifier instanceof TransactionIdentifier);
// if we did not track this transaction before, it means that it came from another leader and we are in
// the process of commiting it while in PreLeader state. That means that it hasnt yet been committed to
// the local DataTree and would be lost if it was only applied via payloadReplicationComplete().
if (!payloadReplicationComplete((TransactionIdentifier) identifier)) {
- applyReplicatedCandidate((CommitTransactionPayload) payload);
+ applyReplicatedCandidate(commit);
}
}
// make sure acquireCandidate() is the last call touching the payload data as we want it to be GC-ed.
- checkRootOverwrite(((CommitTransactionPayload) payload).acquireCandidate().getValue()
- .getCandidate());
- } else if (payload instanceof AbortTransactionPayload) {
+ checkRootOverwrite(commit.acquireCandidate().candidate());
+ } else if (payload instanceof AbortTransactionPayload abort) {
if (identifier != null) {
- payloadReplicationComplete((AbortTransactionPayload) payload);
+ payloadReplicationComplete(abort);
}
- allMetadataAbortedTransaction(((AbortTransactionPayload) payload).getIdentifier());
- } else if (payload instanceof PurgeTransactionPayload) {
+ allMetadataAbortedTransaction(abort.getIdentifier());
+ } else if (payload instanceof PurgeTransactionPayload purge) {
if (identifier != null) {
- payloadReplicationComplete((PurgeTransactionPayload) payload);
+ payloadReplicationComplete(purge);
}
- allMetadataPurgedTransaction(((PurgeTransactionPayload) payload).getIdentifier());
- } else if (payload instanceof CloseLocalHistoryPayload) {
+ allMetadataPurgedTransaction(purge.getIdentifier());
+ } else if (payload instanceof CloseLocalHistoryPayload close) {
if (identifier != null) {
- payloadReplicationComplete((CloseLocalHistoryPayload) payload);
+ payloadReplicationComplete(close);
}
- allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
- } else if (payload instanceof CreateLocalHistoryPayload) {
+ allMetadataClosedLocalHistory(close.getIdentifier());
+ } else if (payload instanceof CreateLocalHistoryPayload create) {
if (identifier != null) {
- payloadReplicationComplete((CreateLocalHistoryPayload)payload);
+ payloadReplicationComplete(create);
}
- allMetadataCreatedLocalHistory(((CreateLocalHistoryPayload) payload).getIdentifier());
- } else if (payload instanceof PurgeLocalHistoryPayload) {
+ allMetadataCreatedLocalHistory(create.getIdentifier());
+ } else if (payload instanceof PurgeLocalHistoryPayload purge) {
if (identifier != null) {
- payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
+ payloadReplicationComplete(purge);
}
- allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
- } else if (payload instanceof SkipTransactionsPayload) {
+ allMetadataPurgedLocalHistory(purge.getIdentifier());
+ } else if (payload instanceof SkipTransactionsPayload skip) {
if (identifier != null) {
- payloadReplicationComplete((SkipTransactionsPayload)payload);
+ payloadReplicationComplete(skip);
}
- allMetadataSkipTransactions((SkipTransactionsPayload) payload);
+ allMetadataSkipTransactions(skip);
} else {
LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
}
}
// top level container ie "/"
- if (candidate.getRootPath().isEmpty()
- && candidate.getRootNode().getModificationType() == ModificationType.WRITE) {
+ if (candidate.getRootPath().isEmpty() && candidate.getRootNode().modificationType() == ModificationType.WRITE) {
LOG.debug("{}: shard root overwritten, enqueuing snapshot", logContext);
shard.self().tell(new InitiateCaptureSnapshot(), noSender());
}
}
private boolean payloadReplicationComplete(final TransactionIdentifier txId) {
- final CommitEntry current = pendingFinishCommits.peek();
+ final var current = pendingFinishCommits.peek();
if (current == null) {
LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
allMetadataCommittedTransaction(txId);
return false;
}
- if (!current.cohort.getIdentifier().equals(txId)) {
+ final var cohortTxId = current.cohort.transactionId();
+ if (!cohortTxId.equals(txId)) {
LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
- current.cohort.getIdentifier(), txId);
+ cohortTxId, txId);
allMetadataCommittedTransaction(txId);
return false;
}
}
final Optional<DataTreeCandidate> readCurrentData() {
- return readNode(YangInstanceIdentifier.empty())
- .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));
+ return readNode(YangInstanceIdentifier.of())
+ .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.of(), state));
}
final void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
- final Optional<DataTreeCandidate> initialState,
- final Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+ final Optional<DataTreeCandidate> initialState, final Consumer<Registration> onRegistration) {
treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration);
}
final SimpleShardDataTreeCohort cohort = entry.cohort;
final DataTreeModification modification = cohort.getDataTreeModification();
- LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Validating transaction {}", logContext, cohort.transactionId());
Exception cause;
try {
tip.validate(modification);
- LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Transaction {} validated", logContext, cohort.transactionId());
cohort.successfulCanCommit();
entry.lastAccess = readTime();
return;
} catch (ConflictingModificationAppliedException e) {
- LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(),
+ LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.transactionId(),
e.getPath());
cause = new OptimisticLockFailedException("Optimistic lock failed for path " + e.getPath(), e);
} catch (DataValidationFailedException e) {
- LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.getIdentifier(),
+ LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.transactionId(),
e.getPath(), e);
// For debugging purposes, allow dumping of the modification. Coupled with the above
// precondition log, it should allow us to understand what went on.
- LOG.debug("{}: Store Tx {}: modifications: {}", logContext, cohort.getIdentifier(), modification);
+ LOG.debug("{}: Store Tx {}: modifications: {}", logContext, cohort.transactionId(), modification);
LOG.trace("{}: Current tree: {}", logContext, dataTree);
cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e);
} catch (Exception e) {
final SimpleShardDataTreeCohort cohort = entry.cohort;
if (cohort.isFailed()) {
- LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Removing failed transaction {}", logContext, cohort.transactionId());
queue.remove();
continue;
}
Collection<String> precedingShardNames = extractPrecedingShardNames(cohort.getParticipatingShardNames());
if (precedingShardNames.isEmpty()) {
- LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.transactionId());
return;
}
LOG.debug("{}: Evaluating tx {} for canCommit - preceding participating shard names {}",
- logContext, cohort.getIdentifier(), precedingShardNames);
+ logContext, cohort.transactionId(), precedingShardNames);
final Iterator<CommitEntry> iter = pendingTransactions.iterator();
int index = -1;
int moveToIndex = -1;
if (cohort.equals(entry.cohort)) {
if (moveToIndex < 0) {
LOG.debug("{}: Not moving tx {} - cannot proceed with canCommit",
- logContext, cohort.getIdentifier());
+ logContext, cohort.transactionId());
return;
}
LOG.debug("{}: Moving {} to index {} in the pendingTransactions queue",
- logContext, cohort.getIdentifier(), moveToIndex);
+ logContext, cohort.transactionId(), moveToIndex);
iter.remove();
insertEntry(pendingTransactions, entry, moveToIndex);
if (!cohort.equals(pendingTransactions.peek().cohort)) {
LOG.debug("{}: Tx {} is not at the head of the queue - cannot proceed with canCommit",
- logContext, cohort.getIdentifier());
+ logContext, cohort.transactionId());
return;
}
LOG.debug("{}: Tx {} is now at the head of the queue - proceeding with canCommit",
- logContext, cohort.getIdentifier());
+ logContext, cohort.transactionId());
break;
}
if (entry.cohort.getState() != State.READY) {
LOG.debug("{}: Skipping pending transaction {} in state {}",
- logContext, entry.cohort.getIdentifier(), entry.cohort.getState());
+ logContext, entry.cohort.transactionId(), entry.cohort.getState());
continue;
}
if (precedingShardNames.equals(pendingPrecedingShardNames)) {
if (moveToIndex < 0) {
LOG.debug("{}: Preceding shard names {} for pending tx {} match - saving moveToIndex {}",
- logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), index);
+ logContext, pendingPrecedingShardNames, entry.cohort.transactionId(), index);
moveToIndex = index;
} else {
LOG.debug(
"{}: Preceding shard names {} for pending tx {} match but moveToIndex already set to {}",
- logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), moveToIndex);
+ logContext, pendingPrecedingShardNames, entry.cohort.transactionId(), moveToIndex);
}
} else {
LOG.debug("{}: Preceding shard names {} for pending tx {} differ - skipping",
- logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier());
+ logContext, pendingPrecedingShardNames, entry.cohort.transactionId());
}
}
}
final SimpleShardDataTreeCohort current = entry.cohort;
verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
- final TransactionIdentifier currentId = current.getIdentifier();
+ final TransactionIdentifier currentId = current.transactionId();
LOG.debug("{}: Preparing transaction {}", logContext, currentId);
final DataTreeCandidateTip candidate;
@SuppressWarnings("checkstyle:IllegalCatch")
private void finishCommit(final SimpleShardDataTreeCohort cohort) {
- final TransactionIdentifier txId = cohort.getIdentifier();
+ final TransactionIdentifier txId = cohort.transactionId();
final DataTreeCandidate candidate = cohort.getCandidate();
LOG.debug("{}: Resuming commit of transaction {}", logContext, txId);
final SimpleShardDataTreeCohort current = entry.cohort;
if (!cohort.equals(current)) {
- LOG.debug("{}: Transaction {} scheduled for commit step", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Transaction {} scheduled for commit step", logContext, cohort.transactionId());
return;
}
- LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier());
+ LOG.debug("{}: Starting commit for transaction {}", logContext, current.transactionId());
- final TransactionIdentifier txId = cohort.getIdentifier();
+ final TransactionIdentifier txId = cohort.transactionId();
final Payload payload;
try {
payload = CommitTransactionPayload.create(txId, candidate, PayloadVersion.current(),
final OptionalLong updateOpt = accessTimeUpdater.apply(currentTx.cohort);
if (updateOpt.isPresent()) {
- final long newAccess = updateOpt.getAsLong();
+ final long newAccess = updateOpt.orElseThrow();
final long newDelta = now - newAccess;
if (newDelta < delta) {
LOG.debug("{}: Updated current transaction {} access time", logContext,
- currentTx.cohort.getIdentifier());
+ currentTx.cohort.transactionId());
currentTx.lastAccess = newAccess;
delta = newDelta;
}
final State state = currentTx.cohort.getState();
LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
- currentTx.cohort.getIdentifier(), deltaMillis, state);
+ currentTx.cohort.transactionId(), deltaMillis, state);
boolean processNext = true;
final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after "
+ deltaMillis + "ms");
break;
case COMMIT_PENDING:
LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
- currentTx.cohort.getIdentifier());
+ currentTx.cohort.transactionId());
currentTx.lastAccess = now;
processNext = false;
return;
final Iterator<CommitEntry> it = Iterables.concat(pendingFinishCommits, pendingCommits,
pendingTransactions).iterator();
if (!it.hasNext()) {
- LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
+ LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.transactionId());
return true;
}
final CommitEntry first = it.next();
if (cohort.equals(first.cohort)) {
if (cohort.getState() != State.COMMIT_PENDING) {
- LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.getIdentifier(),
- cohort.getIdentifier());
+ LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.transactionId(),
+ cohort.transactionId());
it.remove();
if (cohort.getCandidate() != null) {
return true;
}
- LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+ LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.transactionId());
return false;
}
while (it.hasNext()) {
final CommitEntry e = it.next();
if (cohort.equals(e.cohort)) {
- LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
+ LOG.debug("{}: aborting queued transaction {}", logContext, cohort.transactionId());
it.remove();
if (cohort.getCandidate() != null) {
newTip = requireNonNullElse(e.cohort.getCandidate(), newTip);
}
- LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
+ LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.transactionId());
return true;
}
while (iter.hasNext()) {
final SimpleShardDataTreeCohort cohort = iter.next().cohort;
if (cohort.getState() == State.CAN_COMMIT_COMPLETE) {
- LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.transactionId());
try {
tip.validate(cohort.getDataTreeModification());
} catch (DataValidationFailedException | RuntimeException e) {
- LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.getIdentifier(), e);
+ LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.transactionId(), e);
cohort.reportFailure(e);
}
} else if (cohort.getState() == State.PRE_COMMIT_COMPLETE) {
- LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.transactionId());
try {
tip.validate(cohort.getDataTreeModification());
cohort.setNewCandidate(candidate);
tip = candidate;
} catch (RuntimeException | DataValidationFailedException e) {
- LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
+ LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.transactionId(), e);
cohort.reportFailure(e);
}
}