From: Robert Varga Date: Sun, 31 Dec 2023 00:12:30 +0000 (+0100) Subject: ShardDataTreeCohort should not be identifiable X-Git-Tag: v9.0.0~53 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=f4e38b536e68d2338a9a2180284c4981f0123d98 ShardDataTreeCohort should not be identifiable Having a public getIdentifier() member does not make carry domain information. Define a package-local transactionId() method serving the same purpose. Change-Id: If60b02ea6a07b094cd655e3c50dc6ba428c263c3 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java index ed82bd843a..e3bb074bdc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java @@ -53,8 +53,8 @@ final class ChainedCommitCohort extends ShardDataTreeCohort { } @Override - public TransactionIdentifier getIdentifier() { - return delegate.getIdentifier(); + TransactionIdentifier transactionId() { + return delegate.transactionId(); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java index 792aacc3d5..4260ff733b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java @@ -46,7 +46,7 @@ final class CohortEntry { private CohortEntry(final ShardDataTreeCohort cohort, final short clientVersion) { this.cohort = requireNonNull(cohort); - transactionId = cohort.getIdentifier(); + transactionId = cohort.transactionId(); transaction = null; this.clientVersion = clientVersion; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java index 05a77fffef..916cb75f5a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java @@ -297,13 +297,13 @@ abstract sealed class LeaderFrontendState implements Identifiable pending = dataTree.getAndClearPendingTransactions(); + final var failure = new Failure(new RuntimeException(reason)); + final var pending = dataTree.getAndClearPendingTransactions(); log.debug("{}: Aborting {} pending queued transactions", name, pending.size()); - for (ShardDataTreeCohort cohort : pending) { - CohortEntry cohortEntry = cohortCache.remove(cohort.getIdentifier()); - if (cohortEntry == null) { - continue; - } - - if (cohortEntry.getReplySender() != null) { - cohortEntry.getReplySender().tell(failure, shard.self()); + for (var cohort : pending) { + final var cohortEntry = cohortCache.remove(cohort.transactionId()); + if (cohortEntry != null) { + final var replySender = cohortEntry.getReplySender(); + if (replySender != null) { + replySender.tell(failure, shard.self()); + } } } @@ -420,32 +418,31 @@ final class ShardCommitCoordinator { } Collection convertPendingTransactionsToMessages(final int maxModificationsPerBatch) { - final Collection messages = new ArrayList<>(); - for (ShardDataTreeCohort cohort : dataTree.getAndClearPendingTransactions()) { - CohortEntry cohortEntry = cohortCache.remove(cohort.getIdentifier()); + final var messages = new ArrayList(); + for (var cohort : dataTree.getAndClearPendingTransactions()) { + final var cohortEntry = cohortCache.remove(cohort.transactionId()); if (cohortEntry == null) { continue; } - final Deque newMessages = new ArrayDeque<>(); + final var newMessages = new ArrayDeque(); cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() { @Override protected BatchedModifications getModifications() { - final BatchedModifications lastBatch = newMessages.peekLast(); - + final var lastBatch = newMessages.peekLast(); if (lastBatch != null && lastBatch.getModifications().size() >= maxModificationsPerBatch) { return lastBatch; } // Allocate a new message - final BatchedModifications ret = new BatchedModifications(cohortEntry.getTransactionId(), + final var ret = new BatchedModifications(cohortEntry.getTransactionId(), cohortEntry.getClientVersion()); newMessages.add(ret); return ret; } }); - final BatchedModifications last = newMessages.peekLast(); + final var last = newMessages.peekLast(); if (last != null) { final boolean immediate = cohortEntry.isDoImmediateCommit(); last.setDoCommitOnReady(immediate); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 9c1e856d76..bfbedcec98 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -120,7 +120,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @Override public String toString() { - return "CommitEntry [tx=" + cohort.getIdentifier() + ", state=" + cohort.getState() + "]"; + return "CommitEntry [tx=" + cohort.transactionId() + ", state=" + cohort.getState() + "]"; } } @@ -522,16 +522,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } private boolean payloadReplicationComplete(final TransactionIdentifier txId) { - final CommitEntry current = pendingFinishCommits.peek(); + final var current = pendingFinishCommits.peek(); if (current == null) { LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId); allMetadataCommittedTransaction(txId); return false; } - if (!current.cohort.getIdentifier().equals(txId)) { + final var cohortTxId = current.cohort.transactionId(); + if (!cohortTxId.equals(txId)) { LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext, - current.cohort.getIdentifier(), txId); + cohortTxId, txId); allMetadataCommittedTransaction(txId); return false; } @@ -832,25 +833,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final SimpleShardDataTreeCohort cohort = entry.cohort; final DataTreeModification modification = cohort.getDataTreeModification(); - LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier()); + LOG.debug("{}: Validating transaction {}", logContext, cohort.transactionId()); Exception cause; try { tip.validate(modification); - LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier()); + LOG.debug("{}: Transaction {} validated", logContext, cohort.transactionId()); cohort.successfulCanCommit(); entry.lastAccess = readTime(); return; } catch (ConflictingModificationAppliedException e) { - LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(), + LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.transactionId(), e.getPath()); cause = new OptimisticLockFailedException("Optimistic lock failed for path " + e.getPath(), e); } catch (DataValidationFailedException e) { - LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.getIdentifier(), + LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.transactionId(), e.getPath(), e); // For debugging purposes, allow dumping of the modification. Coupled with the above // precondition log, it should allow us to understand what went on. - LOG.debug("{}: Store Tx {}: modifications: {}", logContext, cohort.getIdentifier(), modification); + LOG.debug("{}: Store Tx {}: modifications: {}", logContext, cohort.transactionId(), modification); LOG.trace("{}: Current tree: {}", logContext, dataTree); cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e); } catch (Exception e) { @@ -875,7 +876,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final SimpleShardDataTreeCohort cohort = entry.cohort; if (cohort.isFailed()) { - LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier()); + LOG.debug("{}: Removing failed transaction {}", logContext, cohort.transactionId()); queue.remove(); continue; } @@ -921,12 +922,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { Collection precedingShardNames = extractPrecedingShardNames(cohort.getParticipatingShardNames()); if (precedingShardNames.isEmpty()) { - LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.getIdentifier()); + LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.transactionId()); return; } LOG.debug("{}: Evaluating tx {} for canCommit - preceding participating shard names {}", - logContext, cohort.getIdentifier(), precedingShardNames); + logContext, cohort.transactionId(), precedingShardNames); final Iterator iter = pendingTransactions.iterator(); int index = -1; int moveToIndex = -1; @@ -937,29 +938,29 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { if (cohort.equals(entry.cohort)) { if (moveToIndex < 0) { LOG.debug("{}: Not moving tx {} - cannot proceed with canCommit", - logContext, cohort.getIdentifier()); + logContext, cohort.transactionId()); return; } LOG.debug("{}: Moving {} to index {} in the pendingTransactions queue", - logContext, cohort.getIdentifier(), moveToIndex); + logContext, cohort.transactionId(), moveToIndex); iter.remove(); insertEntry(pendingTransactions, entry, moveToIndex); if (!cohort.equals(pendingTransactions.peek().cohort)) { LOG.debug("{}: Tx {} is not at the head of the queue - cannot proceed with canCommit", - logContext, cohort.getIdentifier()); + logContext, cohort.transactionId()); return; } LOG.debug("{}: Tx {} is now at the head of the queue - proceeding with canCommit", - logContext, cohort.getIdentifier()); + logContext, cohort.transactionId()); break; } if (entry.cohort.getState() != State.READY) { LOG.debug("{}: Skipping pending transaction {} in state {}", - logContext, entry.cohort.getIdentifier(), entry.cohort.getState()); + logContext, entry.cohort.transactionId(), entry.cohort.getState()); continue; } @@ -969,16 +970,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { if (precedingShardNames.equals(pendingPrecedingShardNames)) { if (moveToIndex < 0) { LOG.debug("{}: Preceding shard names {} for pending tx {} match - saving moveToIndex {}", - logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), index); + logContext, pendingPrecedingShardNames, entry.cohort.transactionId(), index); moveToIndex = index; } else { LOG.debug( "{}: Preceding shard names {} for pending tx {} match but moveToIndex already set to {}", - logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), moveToIndex); + logContext, pendingPrecedingShardNames, entry.cohort.transactionId(), moveToIndex); } } else { LOG.debug("{}: Preceding shard names {} for pending tx {} differ - skipping", - logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier()); + logContext, pendingPrecedingShardNames, entry.cohort.transactionId()); } } } @@ -1024,7 +1025,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final SimpleShardDataTreeCohort current = entry.cohort; verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current); - final TransactionIdentifier currentId = current.getIdentifier(); + final TransactionIdentifier currentId = current.transactionId(); LOG.debug("{}: Preparing transaction {}", logContext, currentId); final DataTreeCandidateTip candidate; @@ -1069,7 +1070,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { @SuppressWarnings("checkstyle:IllegalCatch") private void finishCommit(final SimpleShardDataTreeCohort cohort) { - final TransactionIdentifier txId = cohort.getIdentifier(); + final TransactionIdentifier txId = cohort.transactionId(); final DataTreeCandidate candidate = cohort.getCandidate(); LOG.debug("{}: Resuming commit of transaction {}", logContext, txId); @@ -1107,13 +1108,13 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final SimpleShardDataTreeCohort current = entry.cohort; if (!cohort.equals(current)) { - LOG.debug("{}: Transaction {} scheduled for commit step", logContext, cohort.getIdentifier()); + LOG.debug("{}: Transaction {} scheduled for commit step", logContext, cohort.transactionId()); return; } - LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier()); + LOG.debug("{}: Starting commit for transaction {}", logContext, current.transactionId()); - final TransactionIdentifier txId = cohort.getIdentifier(); + final TransactionIdentifier txId = cohort.transactionId(); final Payload payload; try { payload = CommitTransactionPayload.create(txId, candidate, PayloadVersion.current(), @@ -1218,7 +1219,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final long newDelta = now - newAccess; if (newDelta < delta) { LOG.debug("{}: Updated current transaction {} access time", logContext, - currentTx.cohort.getIdentifier()); + currentTx.cohort.transactionId()); currentTx.lastAccess = newAccess; delta = newDelta; } @@ -1233,7 +1234,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final State state = currentTx.cohort.getState(); LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext, - currentTx.cohort.getIdentifier(), deltaMillis, state); + currentTx.cohort.transactionId(), deltaMillis, state); boolean processNext = true; final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after " + deltaMillis + "ms"); @@ -1273,7 +1274,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { break; case COMMIT_PENDING: LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext, - currentTx.cohort.getIdentifier()); + currentTx.cohort.transactionId()); currentTx.lastAccess = now; processNext = false; return; @@ -1297,7 +1298,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final Iterator it = Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions).iterator(); if (!it.hasNext()) { - LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier()); + LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.transactionId()); return true; } @@ -1305,8 +1306,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final CommitEntry first = it.next(); if (cohort.equals(first.cohort)) { if (cohort.getState() != State.COMMIT_PENDING) { - LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.getIdentifier(), - cohort.getIdentifier()); + LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.transactionId(), + cohort.transactionId()); it.remove(); if (cohort.getCandidate() != null) { @@ -1317,7 +1318,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { return true; } - LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier()); + LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.transactionId()); return false; } @@ -1325,7 +1326,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { while (it.hasNext()) { final CommitEntry e = it.next(); if (cohort.equals(e.cohort)) { - LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier()); + LOG.debug("{}: aborting queued transaction {}", logContext, cohort.transactionId()); it.remove(); if (cohort.getCandidate() != null) { @@ -1338,7 +1339,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { newTip = requireNonNullElse(e.cohort.getCandidate(), newTip); } - LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier()); + LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.transactionId()); return true; } @@ -1348,16 +1349,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { while (iter.hasNext()) { final SimpleShardDataTreeCohort cohort = iter.next().cohort; if (cohort.getState() == State.CAN_COMMIT_COMPLETE) { - LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.getIdentifier()); + LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.transactionId()); try { tip.validate(cohort.getDataTreeModification()); } catch (DataValidationFailedException | RuntimeException e) { - LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.getIdentifier(), e); + LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.transactionId(), e); cohort.reportFailure(e); } } else if (cohort.getState() == State.PRE_COMMIT_COMPLETE) { - LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.getIdentifier()); + LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.transactionId()); try { tip.validate(cohort.getDataTreeModification()); @@ -1366,7 +1367,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { cohort.setNewCandidate(candidate); tip = candidate; } catch (RuntimeException | DataValidationFailedException e) { - LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e); + LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.transactionId(), e); cohort.reportFailure(e); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java index 77b8db9209..03cc77f0e0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java @@ -14,15 +14,15 @@ import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; import java.util.Optional; import java.util.SortedSet; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.common.Empty; import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification; @VisibleForTesting -public abstract class ShardDataTreeCohort implements Identifiable { +public abstract class ShardDataTreeCohort { public enum State { READY, CAN_COMMIT_PENDING, @@ -40,6 +40,8 @@ public abstract class ShardDataTreeCohort implements Identifiable participatingShardNames; @@ -61,7 +62,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { } @Override - public TransactionIdentifier getIdentifier() { + TransactionIdentifier transactionId() { return transactionId; } @@ -82,7 +83,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { private void checkState(final State expected) { Preconditions.checkState(state == expected, "State %s does not match expected state %s for %s", - state, expected, getIdentifier()); + state, expected, transactionId()); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 260a6bbdc4..ab95f7028a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -470,8 +470,8 @@ public abstract class AbstractShardTest extends AbstractActorTest { } @Override - public TransactionIdentifier getIdentifier() { - return delegate.getIdentifier(); + TransactionIdentifier transactionId() { + return delegate.transactionId(); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java index f900ec7c05..ef6b2448f7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTest.java @@ -264,29 +264,29 @@ public class ShardDataTreeTest extends AbstractTest { verify(preCommitCallback4).onSuccess(cohort4.getCandidate()); final FutureCallback commitCallback2 = coordinatedCommit(cohort2); - verify(mockShard, never()).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class), + verify(mockShard, never()).persistPayload(eq(cohort1.transactionId()), any(CommitTransactionPayload.class), anyBoolean()); verifyNoMoreInteractions(commitCallback2); final FutureCallback commitCallback4 = coordinatedCommit(cohort4); - verify(mockShard, never()).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class), + verify(mockShard, never()).persistPayload(eq(cohort4.transactionId()), any(CommitTransactionPayload.class), anyBoolean()); verifyNoMoreInteractions(commitCallback4); final FutureCallback commitCallback1 = coordinatedCommit(cohort1); InOrder inOrder = inOrder(mockShard); - inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class), + inOrder.verify(mockShard).persistPayload(eq(cohort1.transactionId()), any(CommitTransactionPayload.class), eq(true)); - inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class), + inOrder.verify(mockShard).persistPayload(eq(cohort2.transactionId()), any(CommitTransactionPayload.class), eq(false)); verifyNoMoreInteractions(commitCallback1); verifyNoMoreInteractions(commitCallback2); final FutureCallback commitCallback3 = coordinatedCommit(cohort3); inOrder = inOrder(mockShard); - inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class), + inOrder.verify(mockShard).persistPayload(eq(cohort3.transactionId()), any(CommitTransactionPayload.class), eq(true)); - inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class), + inOrder.verify(mockShard).persistPayload(eq(cohort4.transactionId()), any(CommitTransactionPayload.class), eq(false)); verifyNoMoreInteractions(commitCallback3); verifyNoMoreInteractions(commitCallback4); @@ -298,10 +298,10 @@ public class ShardDataTreeTest extends AbstractTest { // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload. CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(), cohort1.getCandidate()); - shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload); - shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload); - shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload); - shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload); + shardDataTree.applyReplicatedPayload(cohort1.transactionId(), mockPayload); + shardDataTree.applyReplicatedPayload(cohort2.transactionId(), mockPayload); + shardDataTree.applyReplicatedPayload(cohort3.transactionId(), mockPayload); + shardDataTree.applyReplicatedPayload(cohort4.transactionId(), mockPayload); inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3, commitCallback4); inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class)); @@ -334,19 +334,19 @@ public class ShardDataTreeTest extends AbstractTest { final FutureCallback commitCallback1 = immediate3PhaseCommit(cohort1); InOrder inOrder = inOrder(mockShard); - inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class), + inOrder.verify(mockShard).persistPayload(eq(cohort1.transactionId()), any(CommitTransactionPayload.class), eq(true)); - inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class), + inOrder.verify(mockShard).persistPayload(eq(cohort2.transactionId()), any(CommitTransactionPayload.class), eq(true)); - inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class), + inOrder.verify(mockShard).persistPayload(eq(cohort3.transactionId()), any(CommitTransactionPayload.class), eq(false)); // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload. CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(), cohort1.getCandidate()); - shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload); - shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload); - shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload); + shardDataTree.applyReplicatedPayload(cohort1.transactionId(), mockPayload); + shardDataTree.applyReplicatedPayload(cohort2.transactionId(), mockPayload); + shardDataTree.applyReplicatedPayload(cohort3.transactionId(), mockPayload); inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3); inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class)); @@ -422,19 +422,19 @@ public class ShardDataTreeTest extends AbstractTest { coordinatedCommit(cohort4); InOrder inOrder = inOrder(mockShard); - inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class), + inOrder.verify(mockShard).persistPayload(eq(cohort1.transactionId()), any(CommitTransactionPayload.class), eq(false)); - inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class), + inOrder.verify(mockShard).persistPayload(eq(cohort3.transactionId()), any(CommitTransactionPayload.class), eq(false)); - inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class), + inOrder.verify(mockShard).persistPayload(eq(cohort4.transactionId()), any(CommitTransactionPayload.class), eq(false)); // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload. CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(), cohort1.getCandidate()); - shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload); - shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload); - shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload); + shardDataTree.applyReplicatedPayload(cohort1.transactionId(), mockPayload); + shardDataTree.applyReplicatedPayload(cohort3.transactionId(), mockPayload); + shardDataTree.applyReplicatedPayload(cohort4.transactionId(), mockPayload); final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();