import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.slf4j.Logger;
/**
// Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
public interface CohortDecorator {
- DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual);
+ ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
}
private final Cache<String, CohortEntry> cohortCache;
private CohortEntry currentCohortEntry;
- private final DOMTransactionFactory transactionFactory;
+ private final ShardDataTree dataTree;
private final Queue<CohortEntry> queuedCohortEntries;
private ReadyTransactionReply readyTransactionReply;
- public ShardCommitCoordinator(DOMTransactionFactory transactionFactory,
+ public ShardCommitCoordinator(ShardDataTree dataTree,
long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
this.queueCapacity = queueCapacity;
this.log = log;
this.name = name;
- this.transactionFactory = transactionFactory;
+ this.dataTree = Preconditions.checkNotNull(dataTree);
cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
removalListener(cacheRemovalListener).build();
CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
if(cohortEntry == null) {
cohortEntry = new CohortEntry(batched.getTransactionID(),
- transactionFactory.<DOMStoreWriteTransaction>newTransaction(
- TransactionProxy.TransactionType.WRITE_ONLY, batched.getTransactionID(),
+ dataTree.newReadWriteTransaction(batched.getTransactionID(),
batched.getTransactionChainID()));
cohortCache.put(batched.getTransactionID(), cohortEntry);
}
static class CohortEntry {
private final String transactionID;
- private DOMStoreThreePhaseCommitCohort cohort;
- private final MutableCompositeModification compositeModification;
- private final DOMStoreWriteTransaction transaction;
+ private ShardDataTreeCohort cohort;
+ private final ReadWriteShardDataTreeTransaction transaction;
private ActorRef replySender;
private Shard shard;
private long lastAccessTime;
private boolean doImmediateCommit;
- CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) {
- this.compositeModification = new MutableCompositeModification();
- this.transaction = transaction;
+ CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
+ this.transaction = Preconditions.checkNotNull(transaction);
this.transactionID = transactionID;
}
- CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
+ CohortEntry(String transactionID, ShardDataTreeCohort cohort,
MutableCompositeModification compositeModification) {
this.transactionID = transactionID;
this.cohort = cohort;
- this.compositeModification = compositeModification;
this.transaction = null;
}
return transactionID;
}
- DOMStoreThreePhaseCommitCohort getCohort() {
+ ShardDataTreeCohort getCohort() {
return cohort;
}
- MutableCompositeModification getModification() {
- return compositeModification;
- }
-
void applyModifications(Iterable<Modification> modifications) {
- for(Modification modification: modifications) {
- compositeModification.addModification(modification);
- modification.apply(transaction);
+ for (Modification modification : modifications) {
+ modification.apply(transaction.getSnapshot());
}
}
void setShard(Shard shard) {
this.shard = shard;
}
-
- boolean hasModifications(){
- return compositeModification.getModifications().size() > 0;
- }
}
}