X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=30947fa6662b4a56d5b091cfe3133d019c9f9a24;hb=2890d49fb524bf060f7e95c83bc025df0b6980ed;hp=4ff9b5fd4353e5857ec6533c7a0cfc4ee6ec4ee2;hpb=228af4aa1ef1a802fd24e7e010f3bba959ee03dd;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 4ff9b5fd43..30947fa666 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -30,8 +30,6 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.slf4j.Logger; /** @@ -43,14 +41,14 @@ public class ShardCommitCoordinator { // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts. public interface CohortDecorator { - DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual); + ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual); } private final Cache cohortCache; private CohortEntry currentCohortEntry; - private final DOMTransactionFactory transactionFactory; + private final ShardDataTree dataTree; private final Queue queuedCohortEntries; @@ -75,13 +73,13 @@ public class ShardCommitCoordinator { private ReadyTransactionReply readyTransactionReply; - public ShardCommitCoordinator(DOMTransactionFactory transactionFactory, + public ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) { this.queueCapacity = queueCapacity; this.log = log; this.name = name; - this.transactionFactory = transactionFactory; + this.dataTree = Preconditions.checkNotNull(dataTree); cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS). removalListener(cacheRemovalListener).build(); @@ -162,8 +160,7 @@ public class ShardCommitCoordinator { CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID()); if(cohortEntry == null) { cohortEntry = new CohortEntry(batched.getTransactionID(), - transactionFactory.newTransaction( - TransactionProxy.TransactionType.WRITE_ONLY, batched.getTransactionID(), + dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID())); cohortCache.put(batched.getTransactionID(), cohortEntry); } @@ -415,25 +412,22 @@ public class ShardCommitCoordinator { static class CohortEntry { private final String transactionID; - private DOMStoreThreePhaseCommitCohort cohort; - private final MutableCompositeModification compositeModification; - private final DOMStoreWriteTransaction transaction; + private ShardDataTreeCohort cohort; + private final ReadWriteShardDataTreeTransaction transaction; private ActorRef replySender; private Shard shard; private long lastAccessTime; private boolean doImmediateCommit; - CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) { - this.compositeModification = new MutableCompositeModification(); - this.transaction = transaction; + CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) { + this.transaction = Preconditions.checkNotNull(transaction); this.transactionID = transactionID; } - CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort, + CohortEntry(String transactionID, ShardDataTreeCohort cohort, MutableCompositeModification compositeModification) { this.transactionID = transactionID; this.cohort = cohort; - this.compositeModification = compositeModification; this.transaction = null; } @@ -449,18 +443,13 @@ public class ShardCommitCoordinator { return transactionID; } - DOMStoreThreePhaseCommitCohort getCohort() { + ShardDataTreeCohort getCohort() { return cohort; } - MutableCompositeModification getModification() { - return compositeModification; - } - void applyModifications(Iterable modifications) { - for(Modification modification: modifications) { - compositeModification.addModification(modification); - modification.apply(transaction); + for (Modification modification : modifications) { + modification.apply(transaction.getSnapshot()); } } @@ -500,9 +489,5 @@ public class ShardCommitCoordinator { void setShard(Shard shard) { this.shard = shard; } - - boolean hasModifications(){ - return compositeModification.getModifications().size() > 0; - } } }