Having a public getIdentifier() member does not make carry domain
information.
Define a package-local transactionId() method serving the same purpose.
Change-Id: If60b02ea6a07b094cd655e3c50dc6ba428c263c3
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
- public TransactionIdentifier getIdentifier() {
- return delegate.getIdentifier();
+ TransactionIdentifier transactionId() {
+ return delegate.transactionId();
private CohortEntry(final ShardDataTreeCohort cohort, final short clientVersion) {
this.cohort = requireNonNull(cohort);
private CohortEntry(final ShardDataTreeCohort cohort, final short clientVersion) {
this.cohort = requireNonNull(cohort);
- transactionId = cohort.getIdentifier();
+ transactionId = cohort.transactionId();
transaction = null;
this.clientVersion = clientVersion;
}
transaction = null;
this.clientVersion = clientVersion;
}
final var it = tree.cohortIterator();
while (it.hasNext()) {
final var cohort = it.next();
final var it = tree.cohortIterator();
while (it.hasNext()) {
final var cohort = it.next();
- if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) {
+ final var transactionId = cohort.transactionId();
+ if (clientId.equals(transactionId.getHistoryId().getClientId())) {
if (cohort.getState() != State.COMMIT_PENDING) {
if (cohort.getState() != State.COMMIT_PENDING) {
- LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier());
+ LOG.debug("{}: Retiring transaction {}", persistenceId, transactionId);
- LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId,
- cohort.getIdentifier());
+ LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId, transactionId);
}
private OptionalLong updateAccess(final SimpleShardDataTreeCohort cohort) {
}
private OptionalLong updateAccess(final SimpleShardDataTreeCohort cohort) {
- final FrontendIdentifier frontend = cohort.getIdentifier().getHistoryId().getClientId().getFrontendId();
+ final FrontendIdentifier frontend = cohort.transactionId().getHistoryId().getClientId().getFrontendId();
final LeaderFrontendState state = knownFrontends.get(frontend);
if (state == null) {
// Not tell-based protocol, do nothing
final LeaderFrontendState state = knownFrontends.get(frontend);
if (state == null) {
// Not tell-based protocol, do nothing
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
}
void abortPendingTransactions(final String reason, final Shard shard) {
}
void abortPendingTransactions(final String reason, final Shard shard) {
- final Failure failure = new Failure(new RuntimeException(reason));
- Collection<ShardDataTreeCohort> pending = dataTree.getAndClearPendingTransactions();
+ final var failure = new Failure(new RuntimeException(reason));
+ final var pending = dataTree.getAndClearPendingTransactions();
log.debug("{}: Aborting {} pending queued transactions", name, pending.size());
log.debug("{}: Aborting {} pending queued transactions", name, pending.size());
- for (ShardDataTreeCohort cohort : pending) {
- CohortEntry cohortEntry = cohortCache.remove(cohort.getIdentifier());
- if (cohortEntry == null) {
- continue;
- }
-
- if (cohortEntry.getReplySender() != null) {
- cohortEntry.getReplySender().tell(failure, shard.self());
+ for (var cohort : pending) {
+ final var cohortEntry = cohortCache.remove(cohort.transactionId());
+ if (cohortEntry != null) {
+ final var replySender = cohortEntry.getReplySender();
+ if (replySender != null) {
+ replySender.tell(failure, shard.self());
+ }
}
Collection<?> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
}
Collection<?> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
- final Collection<VersionedExternalizableMessage> messages = new ArrayList<>();
- for (ShardDataTreeCohort cohort : dataTree.getAndClearPendingTransactions()) {
- CohortEntry cohortEntry = cohortCache.remove(cohort.getIdentifier());
+ final var messages = new ArrayList<VersionedExternalizableMessage>();
+ for (var cohort : dataTree.getAndClearPendingTransactions()) {
+ final var cohortEntry = cohortCache.remove(cohort.transactionId());
if (cohortEntry == null) {
continue;
}
if (cohortEntry == null) {
continue;
}
- final Deque<BatchedModifications> newMessages = new ArrayDeque<>();
+ final var newMessages = new ArrayDeque<BatchedModifications>();
cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() {
@Override
protected BatchedModifications getModifications() {
cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() {
@Override
protected BatchedModifications getModifications() {
- final BatchedModifications lastBatch = newMessages.peekLast();
-
+ final var lastBatch = newMessages.peekLast();
if (lastBatch != null && lastBatch.getModifications().size() >= maxModificationsPerBatch) {
return lastBatch;
}
// Allocate a new message
if (lastBatch != null && lastBatch.getModifications().size() >= maxModificationsPerBatch) {
return lastBatch;
}
// Allocate a new message
- final BatchedModifications ret = new BatchedModifications(cohortEntry.getTransactionId(),
+ final var ret = new BatchedModifications(cohortEntry.getTransactionId(),
cohortEntry.getClientVersion());
newMessages.add(ret);
return ret;
}
});
cohortEntry.getClientVersion());
newMessages.add(ret);
return ret;
}
});
- final BatchedModifications last = newMessages.peekLast();
+ final var last = newMessages.peekLast();
if (last != null) {
final boolean immediate = cohortEntry.isDoImmediateCommit();
last.setDoCommitOnReady(immediate);
if (last != null) {
final boolean immediate = cohortEntry.isDoImmediateCommit();
last.setDoCommitOnReady(immediate);
@Override
public String toString() {
@Override
public String toString() {
- return "CommitEntry [tx=" + cohort.getIdentifier() + ", state=" + cohort.getState() + "]";
+ return "CommitEntry [tx=" + cohort.transactionId() + ", state=" + cohort.getState() + "]";
}
private boolean payloadReplicationComplete(final TransactionIdentifier txId) {
}
private boolean payloadReplicationComplete(final TransactionIdentifier txId) {
- final CommitEntry current = pendingFinishCommits.peek();
+ final var current = pendingFinishCommits.peek();
if (current == null) {
LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
allMetadataCommittedTransaction(txId);
return false;
}
if (current == null) {
LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
allMetadataCommittedTransaction(txId);
return false;
}
- if (!current.cohort.getIdentifier().equals(txId)) {
+ final var cohortTxId = current.cohort.transactionId();
+ if (!cohortTxId.equals(txId)) {
LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
- current.cohort.getIdentifier(), txId);
allMetadataCommittedTransaction(txId);
return false;
}
allMetadataCommittedTransaction(txId);
return false;
}
final SimpleShardDataTreeCohort cohort = entry.cohort;
final DataTreeModification modification = cohort.getDataTreeModification();
final SimpleShardDataTreeCohort cohort = entry.cohort;
final DataTreeModification modification = cohort.getDataTreeModification();
- LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Validating transaction {}", logContext, cohort.transactionId());
Exception cause;
try {
tip.validate(modification);
Exception cause;
try {
tip.validate(modification);
- LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Transaction {} validated", logContext, cohort.transactionId());
cohort.successfulCanCommit();
entry.lastAccess = readTime();
return;
} catch (ConflictingModificationAppliedException e) {
cohort.successfulCanCommit();
entry.lastAccess = readTime();
return;
} catch (ConflictingModificationAppliedException e) {
- LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(),
+ LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.transactionId(),
e.getPath());
cause = new OptimisticLockFailedException("Optimistic lock failed for path " + e.getPath(), e);
} catch (DataValidationFailedException e) {
e.getPath());
cause = new OptimisticLockFailedException("Optimistic lock failed for path " + e.getPath(), e);
} catch (DataValidationFailedException e) {
- LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.getIdentifier(),
+ LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.transactionId(),
e.getPath(), e);
// For debugging purposes, allow dumping of the modification. Coupled with the above
// precondition log, it should allow us to understand what went on.
e.getPath(), e);
// For debugging purposes, allow dumping of the modification. Coupled with the above
// precondition log, it should allow us to understand what went on.
- LOG.debug("{}: Store Tx {}: modifications: {}", logContext, cohort.getIdentifier(), modification);
+ LOG.debug("{}: Store Tx {}: modifications: {}", logContext, cohort.transactionId(), modification);
LOG.trace("{}: Current tree: {}", logContext, dataTree);
cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e);
} catch (Exception e) {
LOG.trace("{}: Current tree: {}", logContext, dataTree);
cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e);
} catch (Exception e) {
final SimpleShardDataTreeCohort cohort = entry.cohort;
if (cohort.isFailed()) {
final SimpleShardDataTreeCohort cohort = entry.cohort;
if (cohort.isFailed()) {
- LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Removing failed transaction {}", logContext, cohort.transactionId());
queue.remove();
continue;
}
queue.remove();
continue;
}
Collection<String> precedingShardNames = extractPrecedingShardNames(cohort.getParticipatingShardNames());
if (precedingShardNames.isEmpty()) {
Collection<String> precedingShardNames = extractPrecedingShardNames(cohort.getParticipatingShardNames());
if (precedingShardNames.isEmpty()) {
- LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.transactionId());
return;
}
LOG.debug("{}: Evaluating tx {} for canCommit - preceding participating shard names {}",
return;
}
LOG.debug("{}: Evaluating tx {} for canCommit - preceding participating shard names {}",
- logContext, cohort.getIdentifier(), precedingShardNames);
+ logContext, cohort.transactionId(), precedingShardNames);
final Iterator<CommitEntry> iter = pendingTransactions.iterator();
int index = -1;
int moveToIndex = -1;
final Iterator<CommitEntry> iter = pendingTransactions.iterator();
int index = -1;
int moveToIndex = -1;
if (cohort.equals(entry.cohort)) {
if (moveToIndex < 0) {
LOG.debug("{}: Not moving tx {} - cannot proceed with canCommit",
if (cohort.equals(entry.cohort)) {
if (moveToIndex < 0) {
LOG.debug("{}: Not moving tx {} - cannot proceed with canCommit",
- logContext, cohort.getIdentifier());
+ logContext, cohort.transactionId());
return;
}
LOG.debug("{}: Moving {} to index {} in the pendingTransactions queue",
return;
}
LOG.debug("{}: Moving {} to index {} in the pendingTransactions queue",
- logContext, cohort.getIdentifier(), moveToIndex);
+ logContext, cohort.transactionId(), moveToIndex);
iter.remove();
insertEntry(pendingTransactions, entry, moveToIndex);
if (!cohort.equals(pendingTransactions.peek().cohort)) {
LOG.debug("{}: Tx {} is not at the head of the queue - cannot proceed with canCommit",
iter.remove();
insertEntry(pendingTransactions, entry, moveToIndex);
if (!cohort.equals(pendingTransactions.peek().cohort)) {
LOG.debug("{}: Tx {} is not at the head of the queue - cannot proceed with canCommit",
- logContext, cohort.getIdentifier());
+ logContext, cohort.transactionId());
return;
}
LOG.debug("{}: Tx {} is now at the head of the queue - proceeding with canCommit",
return;
}
LOG.debug("{}: Tx {} is now at the head of the queue - proceeding with canCommit",
- logContext, cohort.getIdentifier());
+ logContext, cohort.transactionId());
break;
}
if (entry.cohort.getState() != State.READY) {
LOG.debug("{}: Skipping pending transaction {} in state {}",
break;
}
if (entry.cohort.getState() != State.READY) {
LOG.debug("{}: Skipping pending transaction {} in state {}",
- logContext, entry.cohort.getIdentifier(), entry.cohort.getState());
+ logContext, entry.cohort.transactionId(), entry.cohort.getState());
if (precedingShardNames.equals(pendingPrecedingShardNames)) {
if (moveToIndex < 0) {
LOG.debug("{}: Preceding shard names {} for pending tx {} match - saving moveToIndex {}",
if (precedingShardNames.equals(pendingPrecedingShardNames)) {
if (moveToIndex < 0) {
LOG.debug("{}: Preceding shard names {} for pending tx {} match - saving moveToIndex {}",
- logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), index);
+ logContext, pendingPrecedingShardNames, entry.cohort.transactionId(), index);
moveToIndex = index;
} else {
LOG.debug(
"{}: Preceding shard names {} for pending tx {} match but moveToIndex already set to {}",
moveToIndex = index;
} else {
LOG.debug(
"{}: Preceding shard names {} for pending tx {} match but moveToIndex already set to {}",
- logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), moveToIndex);
+ logContext, pendingPrecedingShardNames, entry.cohort.transactionId(), moveToIndex);
}
} else {
LOG.debug("{}: Preceding shard names {} for pending tx {} differ - skipping",
}
} else {
LOG.debug("{}: Preceding shard names {} for pending tx {} differ - skipping",
- logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier());
+ logContext, pendingPrecedingShardNames, entry.cohort.transactionId());
final SimpleShardDataTreeCohort current = entry.cohort;
verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
final SimpleShardDataTreeCohort current = entry.cohort;
verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
- final TransactionIdentifier currentId = current.getIdentifier();
+ final TransactionIdentifier currentId = current.transactionId();
LOG.debug("{}: Preparing transaction {}", logContext, currentId);
final DataTreeCandidateTip candidate;
LOG.debug("{}: Preparing transaction {}", logContext, currentId);
final DataTreeCandidateTip candidate;
@SuppressWarnings("checkstyle:IllegalCatch")
private void finishCommit(final SimpleShardDataTreeCohort cohort) {
@SuppressWarnings("checkstyle:IllegalCatch")
private void finishCommit(final SimpleShardDataTreeCohort cohort) {
- final TransactionIdentifier txId = cohort.getIdentifier();
+ final TransactionIdentifier txId = cohort.transactionId();
final DataTreeCandidate candidate = cohort.getCandidate();
LOG.debug("{}: Resuming commit of transaction {}", logContext, txId);
final DataTreeCandidate candidate = cohort.getCandidate();
LOG.debug("{}: Resuming commit of transaction {}", logContext, txId);
final SimpleShardDataTreeCohort current = entry.cohort;
if (!cohort.equals(current)) {
final SimpleShardDataTreeCohort current = entry.cohort;
if (!cohort.equals(current)) {
- LOG.debug("{}: Transaction {} scheduled for commit step", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Transaction {} scheduled for commit step", logContext, cohort.transactionId());
- LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier());
+ LOG.debug("{}: Starting commit for transaction {}", logContext, current.transactionId());
- final TransactionIdentifier txId = cohort.getIdentifier();
+ final TransactionIdentifier txId = cohort.transactionId();
final Payload payload;
try {
payload = CommitTransactionPayload.create(txId, candidate, PayloadVersion.current(),
final Payload payload;
try {
payload = CommitTransactionPayload.create(txId, candidate, PayloadVersion.current(),
final long newDelta = now - newAccess;
if (newDelta < delta) {
LOG.debug("{}: Updated current transaction {} access time", logContext,
final long newDelta = now - newAccess;
if (newDelta < delta) {
LOG.debug("{}: Updated current transaction {} access time", logContext,
- currentTx.cohort.getIdentifier());
+ currentTx.cohort.transactionId());
currentTx.lastAccess = newAccess;
delta = newDelta;
}
currentTx.lastAccess = newAccess;
delta = newDelta;
}
final State state = currentTx.cohort.getState();
LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
final State state = currentTx.cohort.getState();
LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
- currentTx.cohort.getIdentifier(), deltaMillis, state);
+ currentTx.cohort.transactionId(), deltaMillis, state);
boolean processNext = true;
final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after "
+ deltaMillis + "ms");
boolean processNext = true;
final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after "
+ deltaMillis + "ms");
break;
case COMMIT_PENDING:
LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
break;
case COMMIT_PENDING:
LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
- currentTx.cohort.getIdentifier());
+ currentTx.cohort.transactionId());
currentTx.lastAccess = now;
processNext = false;
return;
currentTx.lastAccess = now;
processNext = false;
return;
final Iterator<CommitEntry> it = Iterables.concat(pendingFinishCommits, pendingCommits,
pendingTransactions).iterator();
if (!it.hasNext()) {
final Iterator<CommitEntry> it = Iterables.concat(pendingFinishCommits, pendingCommits,
pendingTransactions).iterator();
if (!it.hasNext()) {
- LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
+ LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.transactionId());
final CommitEntry first = it.next();
if (cohort.equals(first.cohort)) {
if (cohort.getState() != State.COMMIT_PENDING) {
final CommitEntry first = it.next();
if (cohort.equals(first.cohort)) {
if (cohort.getState() != State.COMMIT_PENDING) {
- LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.getIdentifier(),
- cohort.getIdentifier());
+ LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.transactionId(),
+ cohort.transactionId());
it.remove();
if (cohort.getCandidate() != null) {
it.remove();
if (cohort.getCandidate() != null) {
- LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+ LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.transactionId());
while (it.hasNext()) {
final CommitEntry e = it.next();
if (cohort.equals(e.cohort)) {
while (it.hasNext()) {
final CommitEntry e = it.next();
if (cohort.equals(e.cohort)) {
- LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
+ LOG.debug("{}: aborting queued transaction {}", logContext, cohort.transactionId());
it.remove();
if (cohort.getCandidate() != null) {
it.remove();
if (cohort.getCandidate() != null) {
newTip = requireNonNullElse(e.cohort.getCandidate(), newTip);
}
newTip = requireNonNullElse(e.cohort.getCandidate(), newTip);
}
- LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
+ LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.transactionId());
while (iter.hasNext()) {
final SimpleShardDataTreeCohort cohort = iter.next().cohort;
if (cohort.getState() == State.CAN_COMMIT_COMPLETE) {
while (iter.hasNext()) {
final SimpleShardDataTreeCohort cohort = iter.next().cohort;
if (cohort.getState() == State.CAN_COMMIT_COMPLETE) {
- LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.transactionId());
try {
tip.validate(cohort.getDataTreeModification());
} catch (DataValidationFailedException | RuntimeException e) {
try {
tip.validate(cohort.getDataTreeModification());
} catch (DataValidationFailedException | RuntimeException e) {
- LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.getIdentifier(), e);
+ LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.transactionId(), e);
cohort.reportFailure(e);
}
} else if (cohort.getState() == State.PRE_COMMIT_COMPLETE) {
cohort.reportFailure(e);
}
} else if (cohort.getState() == State.PRE_COMMIT_COMPLETE) {
- LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.getIdentifier());
+ LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.transactionId());
try {
tip.validate(cohort.getDataTreeModification());
try {
tip.validate(cohort.getDataTreeModification());
cohort.setNewCandidate(candidate);
tip = candidate;
} catch (RuntimeException | DataValidationFailedException e) {
cohort.setNewCandidate(candidate);
tip = candidate;
} catch (RuntimeException | DataValidationFailedException e) {
- LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
+ LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.transactionId(), e);
cohort.reportFailure(e);
}
}
cohort.reportFailure(e);
}
}
import com.google.common.util.concurrent.FutureCallback;
import java.util.Optional;
import java.util.SortedSet;
import com.google.common.util.concurrent.FutureCallback;
import java.util.Optional;
import java.util.SortedSet;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidateTip;
import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
@VisibleForTesting
import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidateTip;
import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
@VisibleForTesting
-public abstract class ShardDataTreeCohort implements Identifiable<TransactionIdentifier> {
+public abstract class ShardDataTreeCohort {
public enum State {
READY,
CAN_COMMIT_PENDING,
public enum State {
READY,
CAN_COMMIT_PENDING,
// Prevent foreign instantiation
}
// Prevent foreign instantiation
}
+ abstract @NonNull TransactionIdentifier transactionId();
+
// FIXME: This leaks internal state generated in preCommit,
// should be result of canCommit
abstract DataTreeCandidateTip getCandidate();
// FIXME: This leaks internal state generated in preCommit,
// should be result of canCommit
abstract DataTreeCandidateTip getCandidate();
}
ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
}
ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
- return toStringHelper.add("id", getIdentifier()).add("state", getState());
+ return toStringHelper.add("id", transactionId()).add("state", getState());
import java.util.Optional;
import java.util.SortedSet;
import java.util.concurrent.CompletionStage;
import java.util.Optional;
import java.util.SortedSet;
import java.util.concurrent.CompletionStage;
+import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.yang.common.Empty;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.yang.common.Empty;
private final DataTreeModification transaction;
private final ShardDataTree dataTree;
private final DataTreeModification transaction;
private final ShardDataTree dataTree;
- private final TransactionIdentifier transactionId;
+ private final @NonNull TransactionIdentifier transactionId;
private final CompositeDataTreeCohort userCohorts;
private final @Nullable SortedSet<String> participatingShardNames;
private final CompositeDataTreeCohort userCohorts;
private final @Nullable SortedSet<String> participatingShardNames;
- public TransactionIdentifier getIdentifier() {
+ TransactionIdentifier transactionId() {
private void checkState(final State expected) {
Preconditions.checkState(state == expected, "State %s does not match expected state %s for %s",
private void checkState(final State expected) {
Preconditions.checkState(state == expected, "State %s does not match expected state %s for %s",
- state, expected, getIdentifier());
+ state, expected, transactionId());
- public TransactionIdentifier getIdentifier() {
- return delegate.getIdentifier();
+ TransactionIdentifier transactionId() {
+ return delegate.transactionId();
verify(preCommitCallback4).onSuccess(cohort4.getCandidate());
final FutureCallback<UnsignedLong> commitCallback2 = coordinatedCommit(cohort2);
verify(preCommitCallback4).onSuccess(cohort4.getCandidate());
final FutureCallback<UnsignedLong> commitCallback2 = coordinatedCommit(cohort2);
- verify(mockShard, never()).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
+ verify(mockShard, never()).persistPayload(eq(cohort1.transactionId()), any(CommitTransactionPayload.class),
anyBoolean());
verifyNoMoreInteractions(commitCallback2);
final FutureCallback<UnsignedLong> commitCallback4 = coordinatedCommit(cohort4);
anyBoolean());
verifyNoMoreInteractions(commitCallback2);
final FutureCallback<UnsignedLong> commitCallback4 = coordinatedCommit(cohort4);
- verify(mockShard, never()).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
+ verify(mockShard, never()).persistPayload(eq(cohort4.transactionId()), any(CommitTransactionPayload.class),
anyBoolean());
verifyNoMoreInteractions(commitCallback4);
final FutureCallback<UnsignedLong> commitCallback1 = coordinatedCommit(cohort1);
InOrder inOrder = inOrder(mockShard);
anyBoolean());
verifyNoMoreInteractions(commitCallback4);
final FutureCallback<UnsignedLong> commitCallback1 = coordinatedCommit(cohort1);
InOrder inOrder = inOrder(mockShard);
- inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
+ inOrder.verify(mockShard).persistPayload(eq(cohort1.transactionId()), any(CommitTransactionPayload.class),
- inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
+ inOrder.verify(mockShard).persistPayload(eq(cohort2.transactionId()), any(CommitTransactionPayload.class),
eq(false));
verifyNoMoreInteractions(commitCallback1);
verifyNoMoreInteractions(commitCallback2);
final FutureCallback<UnsignedLong> commitCallback3 = coordinatedCommit(cohort3);
inOrder = inOrder(mockShard);
eq(false));
verifyNoMoreInteractions(commitCallback1);
verifyNoMoreInteractions(commitCallback2);
final FutureCallback<UnsignedLong> commitCallback3 = coordinatedCommit(cohort3);
inOrder = inOrder(mockShard);
- inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
+ inOrder.verify(mockShard).persistPayload(eq(cohort3.transactionId()), any(CommitTransactionPayload.class),
- inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
+ inOrder.verify(mockShard).persistPayload(eq(cohort4.transactionId()), any(CommitTransactionPayload.class),
eq(false));
verifyNoMoreInteractions(commitCallback3);
verifyNoMoreInteractions(commitCallback4);
eq(false));
verifyNoMoreInteractions(commitCallback3);
verifyNoMoreInteractions(commitCallback4);
// The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
cohort1.getCandidate());
// The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
cohort1.getCandidate());
- shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
- shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload);
- shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
- shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload);
+ shardDataTree.applyReplicatedPayload(cohort1.transactionId(), mockPayload);
+ shardDataTree.applyReplicatedPayload(cohort2.transactionId(), mockPayload);
+ shardDataTree.applyReplicatedPayload(cohort3.transactionId(), mockPayload);
+ shardDataTree.applyReplicatedPayload(cohort4.transactionId(), mockPayload);
inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3, commitCallback4);
inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3, commitCallback4);
inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
InOrder inOrder = inOrder(mockShard);
final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
InOrder inOrder = inOrder(mockShard);
- inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
+ inOrder.verify(mockShard).persistPayload(eq(cohort1.transactionId()), any(CommitTransactionPayload.class),
- inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
+ inOrder.verify(mockShard).persistPayload(eq(cohort2.transactionId()), any(CommitTransactionPayload.class),
- inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
+ inOrder.verify(mockShard).persistPayload(eq(cohort3.transactionId()), any(CommitTransactionPayload.class),
eq(false));
// The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
cohort1.getCandidate());
eq(false));
// The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
cohort1.getCandidate());
- shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
- shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload);
- shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
+ shardDataTree.applyReplicatedPayload(cohort1.transactionId(), mockPayload);
+ shardDataTree.applyReplicatedPayload(cohort2.transactionId(), mockPayload);
+ shardDataTree.applyReplicatedPayload(cohort3.transactionId(), mockPayload);
inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
coordinatedCommit(cohort4);
InOrder inOrder = inOrder(mockShard);
coordinatedCommit(cohort4);
InOrder inOrder = inOrder(mockShard);
- inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
+ inOrder.verify(mockShard).persistPayload(eq(cohort1.transactionId()), any(CommitTransactionPayload.class),
- inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
+ inOrder.verify(mockShard).persistPayload(eq(cohort3.transactionId()), any(CommitTransactionPayload.class),
- inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
+ inOrder.verify(mockShard).persistPayload(eq(cohort4.transactionId()), any(CommitTransactionPayload.class),
eq(false));
// The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
cohort1.getCandidate());
eq(false));
// The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
cohort1.getCandidate());
- shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
- shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
- shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload);
+ shardDataTree.applyReplicatedPayload(cohort1.transactionId(), mockPayload);
+ shardDataTree.applyReplicatedPayload(cohort3.transactionId(), mockPayload);
+ shardDataTree.applyReplicatedPayload(cohort4.transactionId(), mockPayload);
final DataTreeSnapshot snapshot =
shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
final DataTreeSnapshot snapshot =
shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();