import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
// Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
public interface CohortDecorator {
- ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
+ ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual);
}
- private final Map<String, CohortEntry> cohortCache = new HashMap<>();
+ private final Map<Identifier, CohortEntry> cohortCache = new HashMap<>();
private CohortEntry currentCohortEntry;
final ShardDataTreeCohort cohort = ready.getTransaction().ready();
final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion());
- cohortCache.put(ready.getTransactionID(), cohortEntry);
+ cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
if(!queueCohortEntry(cohortEntry, sender, shard)) {
return;
CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
if(cohortEntry == null) {
cohortEntry = new CohortEntry(batched.getTransactionID(),
- dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()),
+ dataTree.newReadWriteTransaction(batched.getTransactionID()),
cohortRegistry, schema, batched.getVersion());
- cohortCache.put(batched.getTransactionID(), cohortEntry);
+ cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
}
if(log.isDebugEnabled()) {
message.getTransactionID());
final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema,
DataStoreVersions.CURRENT_VERSION);
- cohortCache.put(message.getTransactionID(), cohortEntry);
+ cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
if(!queueCohortEntry(cohortEntry, sender, shard)) {
protected BatchedModifications getModifications() {
if(newModifications.isEmpty() ||
newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
- newModifications.add(new BatchedModifications(from.getTransactionID(),
- from.getVersion(), from.getTransactionChainID()));
+ newModifications.add(new BatchedModifications(from.getTransactionID(), from.getVersion()));
}
return newModifications.getLast();
}
private void handleCanCommit(CohortEntry cohortEntry) {
- String transactionID = cohortEntry.getTransactionID();
-
cohortEntry.updateLastAccessTime();
if(currentCohortEntry != null) {
if(log.isDebugEnabled()) {
log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
- name, currentCohortEntry.getTransactionID(), transactionID);
+ name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID());
}
return;
if(log.isDebugEnabled()) {
log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
- transactionID);
+ cohortEntry.getTransactionID());
}
}
}
* @param sender the actor to which to send the response
* @param shard the transaction's shard actor
*/
- void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
+ void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) {
// Lookup the cohort entry that was cached previously (or should have been) by
// transactionReady (via the ForwardedReadyTransaction message).
final CohortEntry cohortEntry = cohortCache.get(transactionID);
* @param shard the transaction's shard actor
* @return true if the transaction was successfully prepared, false otherwise.
*/
- boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
+ boolean handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
// Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
// this transaction.
final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
return doCommit(cohortEntry);
}
- void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
+ void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) {
CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
if(cohortEntry != null) {
// We don't remove the cached cohort entry here (ie pass false) in case the Tx was
if(newModifications.isEmpty() ||
newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
- cohortEntry.getClientVersion(), ""));
+ cohortEntry.getClientVersion()));
}
return newModifications.getLast();
* @return the current CohortEntry or null if the given transaction ID does not match the
* current entry.
*/
- CohortEntry getCohortEntryIfCurrent(String transactionID) {
+ CohortEntry getCohortEntryIfCurrent(Identifier transactionID) {
if(isCurrentTransaction(transactionID)) {
return currentCohortEntry;
}
return currentCohortEntry;
}
- CohortEntry getAndRemoveCohortEntry(String transactionID) {
+ CohortEntry getAndRemoveCohortEntry(Identifier transactionID) {
return cohortCache.remove(transactionID);
}
- boolean isCurrentTransaction(String transactionID) {
+ boolean isCurrentTransaction(Identifier transactionID) {
return currentCohortEntry != null &&
currentCohortEntry.getTransactionID().equals(transactionID);
}
* @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
* the cache.
*/
- void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
+ void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) {
if(removeCohortEntry) {
cohortCache.remove(transactionID);
}