X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=0eb48fd180c169eba511cac1272eb87ab227b819;hb=ccfab2e4f9599c13c58d9fa62a3275b17d3d6caa;hp=4ff9b5fd4353e5857ec6533c7a0cfc4ee6ec4ee2;hpb=228af4aa1ef1a802fd24e7e010f3bba959ee03dd;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 4ff9b5fd43..0eb48fd180 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -31,7 +31,6 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.slf4j.Logger; /** @@ -50,7 +49,7 @@ public class ShardCommitCoordinator { private CohortEntry currentCohortEntry; - private final DOMTransactionFactory transactionFactory; + private final ShardDataTree dataTree; private final Queue queuedCohortEntries; @@ -75,13 +74,13 @@ public class ShardCommitCoordinator { private ReadyTransactionReply readyTransactionReply; - public ShardCommitCoordinator(DOMTransactionFactory transactionFactory, + public ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) { this.queueCapacity = queueCapacity; this.log = log; this.name = name; - this.transactionFactory = transactionFactory; + this.dataTree = Preconditions.checkNotNull(dataTree); cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS). removalListener(cacheRemovalListener).build(); @@ -162,8 +161,7 @@ public class ShardCommitCoordinator { CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID()); if(cohortEntry == null) { cohortEntry = new CohortEntry(batched.getTransactionID(), - transactionFactory.newTransaction( - TransactionProxy.TransactionType.WRITE_ONLY, batched.getTransactionID(), + dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID())); cohortCache.put(batched.getTransactionID(), cohortEntry); } @@ -417,15 +415,15 @@ public class ShardCommitCoordinator { private final String transactionID; private DOMStoreThreePhaseCommitCohort cohort; private final MutableCompositeModification compositeModification; - private final DOMStoreWriteTransaction transaction; + private final ReadWriteShardDataTreeTransaction transaction; private ActorRef replySender; private Shard shard; private long lastAccessTime; private boolean doImmediateCommit; - CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) { + CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) { this.compositeModification = new MutableCompositeModification(); - this.transaction = transaction; + this.transaction = Preconditions.checkNotNull(transaction); this.transactionID = transactionID; } @@ -460,7 +458,7 @@ public class ShardCommitCoordinator { void applyModifications(Iterable modifications) { for(Modification modification: modifications) { compositeModification.addModification(modification); - modification.apply(transaction); + modification.apply(transaction.getSnapshot()); } }